赞
踩
发送验证码功能:
@Override public Result sendCode(String phone, HttpSession session) { //1.校验手机号 if(RegexUtils.isPhoneInvalid(phone)){ //2.如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } //3.符合,生成验证码 String code = RandomUtil.randomNumbers(6); //4.保存验证码到session session.setAttribute("code",code); //5.发送验证码 log.debug("发送短信验证码成功,验证码:{}"+code); //返回ok return Result.ok(); }
登录功能:
登录表单的实体类:
@Data
public class LoginFormDTO {
private String phone;
private String code;
private String password;
}
登录逻辑代码实现:
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { //1.校验手机号 String phone = loginForm.getPhone(); if(RegexUtils.isPhoneInvalid(phone)){ //如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } //2.校验验证码 Object cacheCode = session.getAttribute("code"); String code = loginForm.getCode(); if(cacheCode==null ||! cacheCode.toString().equals(code)){ //3.不一致,报错 return Result.fail("验证码错误!"); } //4.一致,根据手机号查询用户 select * from tb_user where phone = ? User user = query().eq("phone", phone).one(); //5.判断用户是否存在 if(user==null) { //6.不存在,创建新用户并保存 user=createUsrWithPhone(phone); } //7.保存用户信息到session中 session.setAttribute("user",user); return null; } private User createUsrWithPhone(String phone) { //1.创建用户 User user = new User(); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10)); //2.保存用户 save(user); return user; }
ThreadLocal 叫做本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal 为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。
注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。
可以通过hutool工具类,在UserService里修改:
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user){
tl.set(user);
}
public static UserDTO getUser(){
return tl.get();
}
public static void removeUser(){
tl.remove();
}
}
拦截器:
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.获取session HttpSession session = request.getSession(); //2.获取session中的用户 Object user = session.getAttribute("user"); //3.判断用户是否存在 if(user==null) { //4.不存在,拦截,返回401状态码,代表未授权 response.setStatus(401); return false; } //5.存在,保存用户信息到ThreadLocal UserHolder.saveUser((UserDTO) user); //6.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
添加拦截器:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
}
}
session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题
session的替代方案应该满足:
数据共享
内存存储
key、value结构
===>redis
拦截器修改:
public class LoginInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public LoginInterceptor(StringRedisTemplate stringRedisTemplate){ this.stringRedisTemplate=stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.获取请求头中的token String token = request.getHeader("authorization"); //判断token是否为空 if(StrUtil.isBlank(token)){ response.setStatus(401); return false; } String key=LOGIN_USER_KEY+token; //2.基于token获取redis中的用户 Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key); //3.判断用户是否存在 if(userMap.isEmpty()){ //不存在,拦截,返回401状态码 response.setStatus(401); return false; } //5.存在,将查询到的Hash数据转为UserDTO对象 UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); //6.保存用户到ThreadLocal UserHolder.saveUser(userDTO); //7.刷新token有效期 stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES); //8.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
MvcConfig修改:
@Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)) .excludePathPatterns( "/user/code", "/user/login", "/blog/hot", "/shop/**", "/shop-type/**", "/voucher/**", "/upload/**"); } }
UserServiceImpl修改:
@Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result sendCode(String phone, HttpSession session) { //1.校验手机号 if(RegexUtils.isPhoneInvalid(phone)){ //2.如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } //3.符合,生成验证码 String code = RandomUtil.randomNumbers(6); //4.保存验证码到redis中 stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES); //5.发送验证码 log.debug("发送短信验证码成功,验证码:{}"+code); //返回ok return Result.ok(); } @Override public Result login(LoginFormDTO loginForm, HttpSession session) { //1.校验手机号 String phone = loginForm.getPhone(); if(RegexUtils.isPhoneInvalid(phone)){ //如果不符合,返回错误信息 return Result.fail("手机号格式错误!"); } //2.校验验证码 String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); String code = loginForm.getCode(); if(cacheCode==null ||!cacheCode.equals(code)){ //3.不一致,报错 return Result.fail("验证码错误!"); } //4.一致,根据手机号查询用户 select * from tb_user where phone = ? User user = query().eq("phone", phone).one(); //5.判断用户是否存在 if(user==null) { //6.不存在,创建新用户并保存 user=createUsrWithPhone(phone); } //保存用户信息到redis中 //1.随机生成token,作为登录令牌 String token = UUID.randomUUID().toString(true); //true代表isSimple,即不带中划线 //2.将User对象转为Hash存储 UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString())); String tokenKey=LOGIN_USER_KEY+token; //7.存储 stringRedisTemplate.opsForHash().putAll(tokenKey,userMap); //设置token有效期 stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES); return Result.ok(); } private User createUsrWithPhone(String phone) { //1.创建用户 User user = new User(); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10)); //2.保存用户 save(user); return user; } }
Redis代替session需要考虑的问题:
选择合适的数据结构
选择合适的key
选择合适的存储粒度
RefreshTokenInterceptor
public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){ this.stringRedisTemplate=stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.获取请求头中的token String token = request.getHeader("authorization"); //判断token是否为空 if(StrUtil.isBlank(token)){ return true; } String key=LOGIN_USER_KEY+token; //2.基于token获取redis中的用户 Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key); //3.判断用户是否存在 if(userMap.isEmpty()){ return true; } //5.存在,将查询到的Hash数据转为UserDTO对象 UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); //6.保存用户到ThreadLocal UserHolder.saveUser(userDTO); //7.刷新token有效期 stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES); //8.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //判断是否需要拦截(ThreadLocal中是否有用户) if(UserHolder.getUser()==null){ //没有,需要拦截,设置状态码 response.setStatus(401); return false; } //有用户,则放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
配置添加拦截器
注意通过order控制拦截器执行顺序,order越小越先执行
@Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( "/user/code", "/user/login", "/blog/hot", "/shop/**", "/shop-type/**", "/voucher/**", "/upload/**").order(1); registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0); } }
缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高
/**
* 根据id查询商铺信息
* @param id 商铺id
* @return 商铺详情数据
*/
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
ShopServiceImpl:
@Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById(Long id) { String key=CACHE_SHOP_KEY+id; //1.从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); //2.判断是否存在 if(StrUtil.isNotBlank(shopJson)) { //3.存在,直接返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } //4.不存在,根据id查询数据库 Shop shop = getById(id); //5.不存在,返回错误 if(shop==null){ return Result.fail("店铺不存在!"); } //6.存在,写入redis stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop)); //7.返回 return Result.ok(shop); } }
@Service public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService { @Autowired private ShopTypeMapper shopTypeMapper; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public Result queryBatch() { String key="CACHE_SHOP_TYPE_KEY"; String jsonType = stringRedisTemplate.opsForValue().get(key); if(StrUtil.isNotBlank(jsonType)){ List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class); return Result.ok(shopTypes); } List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>()); if(shopTypes.isEmpty()){ return Result.fail("您查询的页面不存在!"); } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes)); return Result.ok(shopTypes); } }
业务场景:
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
主动更新策略:
操作缓存和数据库时有三个问题需要考虑:
1.删除缓存还是更新缓存?
更新缓存:每次更新都更新缓存,无效写操作较多
删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)
2.如何保证缓存与数据库的操作的同时成功或失败?
单体系统:将缓存与数据库操作放在一个事务
分布式系统:利用TCC等分布式事务方案
3.先操作缓存还是先操作数据库?
总结:
案例:给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的业务逻辑,满足下面的需求:
①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
②根据id修改店铺时,先修改数据库,再删除缓存
更新操作:
@Override
@Transactional
public Result update(Shop shop) {
Long id=shop.getId();
if(id==null){
return Result.fail("店铺id不能为空!");
}
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
return Result.ok();
}
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
编码解决商铺查询的缓存穿透问题
@Override public Result queryById(Long id) { String key=CACHE_SHOP_KEY+id; //1.从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); //2.判断是否存在 if(StrUtil.isNotBlank(shopJson)) { //3.存在,直接返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } //命中的是否为空值 if(shopJson!=null){ return Result.fail("店铺信息不存在!"); } //4.不存在,根据id查询数据库 Shop shop = getById(id); //5.不存在,返回错误 if(shop==null){ //将空值写入redis stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return Result.fail("店铺信息不存在!"); } //6.存在,写入redis stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES); //7.返回 return Result.ok(shop); }
总结:
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存null值
布隆过滤
增强id的复杂度,避免被猜测id规律
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
互斥锁
逻辑过期
互斥锁和逻辑过期对比:
互斥锁和逻辑过期优缺点:
需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
public Shop queryWithMutex(Long id){ String key = CACHE_SHOP_KEY+id; //从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); //判断缓存是否命中 if(StrUtil.isNotBlank(shopJson)){ //命中则直接返回数据 return JSONUtil.toBean(shopJson, Shop.class); } //判断是否是缓存穿透 if(shopJson!=null){ return null; } //实现缓存重建 //1.获取互斥锁 String lockKey=LOCK_SHOP_KEY+id; try{ boolean isLock = tryLock(lockKey); //2.判断是否获取成功 if(!isLock) { //3.失败,则休眠并重试 Thread.sleep(50); queryWithMutex(id); } //4.成功,根据id查询数据库 Shop shop = getById(id); //5.不存在,返回错误 if(shop==null){ stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); return null; } //6.存在,写入redis stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES); return shop; }catch(InterruptedException e){ throw new RuntimeException(e); }finally{ unlock(lockKey); } } private boolean tryLock(String key){ Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES); return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱 } private void unlock(String key){ stringRedisTemplate.delete(key); }
需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题
添加逻辑过期时间:
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
整体实现逻辑:
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10); public Shop queryWithLogicalExpire(Long id){ String key=CACHE_SHOP_KEY+id; String shopJson = stringRedisTemplate.opsForValue().get(key); //缓存未命中 if(StrUtil.isBlank(shopJson)){ return null; } //命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); //判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { //未过期,直接返回店铺信息 return shop; } //已过期,需要缓存重建 String lockKey=LOCK_SHOP_KEY+id; //获取互斥锁 boolean isLock = tryLock(lockKey); //判断是否获取锁成功 if(isLock) { //成功,开启独立线程,实现缓存重建 CACHE_REBUILD_EXECUTOR.submit(()->{ try { saveShop2Redis(id, CACHE_SHOP_TTL); }catch(Exception e){ throw new RuntimeException(e); }finally{ unlock(lockKey); } }); } //返过期的商铺信息 } private boolean tryLock(String key){ Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES); return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱 } private void unlock(String key){ stringRedisTemplate.delete(key); } public void saveShop2Redis(Long id,Long expireSeconds){ //1.查询店铺数据 Shop shop = getById(id); //2.封装逻辑过期时间 RedisData redisData = new RedisData(); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); //3.写入Redis stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData)); }
基于StringRedisTemplate封装一个缓存工具类,满足下列要求:
方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Component public class CacheClient { private final StringRedisTemplate stringRedisTemplate; public CacheClient(StringRedisTemplate stringRedisTemplate){ this.stringRedisTemplate=stringRedisTemplate; } public void set(String key, Object value, Long time, TimeUnit unit){ stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit); } public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){ //设置逻辑过期 RedisData redisData = new RedisData(); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){ String key=keyPrefix+id; String json = stringRedisTemplate.opsForValue().get(key); if(StrUtil.isNotBlank(json)){ return JSONUtil.toBean(json,type); } //判断命中的是否为空值 if(json!=null){ return null; } R r = dbFallback.apply(id); if(r==null){ stringRedisTemplate.opsForValue().set(key,"",time,unit); return null; } this.set(key,r,time,unit); return r; } private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10); public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){ String key=keyPrefix+id; String json = stringRedisTemplate.opsForValue().get(key); if(StrUtil.isBlank(json)){ return null; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); //判断是否过期 if(expireTime.isAfter(LocalDateTime.now())){ //未过期,直接返回店铺信息 return r; } //已过期,需要缓存重建 String lockKey=LOCK_SHOP_KEY+id; boolean isLock = tryLock(lockKey); if(isLock){ CACHE_REBUILD_EXECUTOR.submit(()->{ try{ //重建缓存 //1.查询数据库 R apply = dbFallback.apply(id); //2.存入redis this.setWithLogicalExpire(key,apply,time,unit); }catch (Exception e){ throw new RuntimeException(e); }finally{ unlock(key); } }); } return r; } private boolean tryLock(String key){ Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key){ stringRedisTemplate.delete(key); } }
什么是缓存?
缓存的作用?
降低后端负载
提高读写响应速度
缓存的成本?
三种策略:
内存淘汰:redis自带的内存淘汰机制
过期淘汰:利用expire命令给数据设置过期时间
主动更新:主动完成数据库与缓存的同时更新
策略选择:
低一致性需求:内存淘汰或过期淘汰
高一致性需求:
主动更新为主
过期淘汰兜底
Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新
一致性良好、实现难度一般
Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性
一致性优秀、实现复杂、性能一般
Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致
一致性差、性能好、实现复杂
Cache Aside模式选择
更新缓存还是删除缓存?
更新缓存会产生无效更新,并且存在较大的线程安全问题
删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低
先操作数据库还是缓存?
先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低
先删除缓存,再更新数据库——安全问题概率极高
如何确保数据库与缓存操作原子性?
单体系统——利用事务机制
分布式系统——利用分布式事务机制
查询数据时
1.先查询缓存
2.如果缓存命中,直接返回
3.如果缓存未命中,则查询数据库
4.将数据库数据写入缓存
5.返回结果
修改数据库时:
1.先修改数据库
2.然后删除缓存
===>确保两者的原子性
产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
解决方案:
①缓存空对象:
思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间
优点:实现简单,便于维护
缺点:额外的内存消耗、短期的数据不一致问题
②布隆过滤:
思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求
优点:内存占用少
缺点:实现复杂、存在误判的可能性
③其他:
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
产生原因:
热点key:①在某一时段被高并发访问 ②缓存重建耗时较长
热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击
解决方案:
互斥锁:
思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
优点:①实现简单 ②没有额外内存消耗 ③一致性好
缺点:①等待导致性能下降
缺点:有死锁风险
逻辑过期:
思路:
①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存
②重建缓存也通过互斥锁来保证单线程执行
③重建缓存利用独立线程异步执行
④其他线程无需等待,直接查询到旧数据即可
优点:现成无需等待,性能较好
缺点:
①不保证一致性
②有额外内存消耗
③实现复杂
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:
id的规律性太明显
受单表数据量的限制
全局ID生成器:
全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID
@Component public class RedisIdWorker { //开始时间戳 2023-01-01 00:00:00 private static final long BEGIN_TIMESTAMP=1672531200L; private static final int COUNT_BITS=32; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate){ this.stringRedisTemplate=stringRedisTemplate; } public long nextId(String keyPrefix){ //1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp=nowSecond-BEGIN_TIMESTAMP; //2.生成序列号 //2.1 获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); //2.2 自增长 long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date); //3.拼接并返回 return timestamp<<COUNT_BITS | count; } }
测试代码:
@Resource private RedisIdWorker redisIdWorker; private ExecutorService es= Executors.newFixedThreadPool(500); @Test void testIdWorker() throws InterruptedException { CountDownLatch latch = new CountDownLatch(300); Runnable task=()->{ for (int i = 0; i < 100; i++) { long id = redisIdWorker.nextId("order"); System.out.println("id="+id); } latch.countDown(); }; latch.countDown(); long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } latch.await(); long end=System.currentTimeMillis(); System.out.println("time="+(end-begin)); }
总结:
全局唯一ID生成策略:
UUID
Redis自增
snowflake算法
数据库自增
Redis自增ID策略:
每天一个key,方便统计订单量
ID构造是 时间戳+计数器
每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。
通过postman添加优惠券:
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2023-04-29T10:09:17",
"endTime":"2023-04-29T23:09:04"
}
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Override @Transactional public Result seckillVoucher(Long voucherId) { //1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); //2.判断秒杀是否开始 if(voucher.getBeginTime().isAfter(LocalDateTime.now())) { //尚未开始 return Result.fail("秒杀尚未开始!"); } //3.判断秒杀是否结束 if(voucher.getEndTime().isBefore(LocalDateTime.now())){ //已经结束 return Result.fail("秒杀已经结束!"); } //4.判断库存是否充足 if(voucher.getStock()<1){ //库存不足 return Result.fail("库存不足!"); } //5.扣减库存 boolean success=seckillVoucherService .update() .setSql("stock=stock-1") .eq("voucher_id",voucherId) .update(); if(!success){ //扣减失败 return Result.fail("库存不足!"); } //6.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); //6.1 订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); //6.2 用户id Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); //6.3 代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); //7.返回订单id return Result.ok(orderId); } }
用jmeter模拟高并发:
模拟结果:
原因:
超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
版本号法
CAS法
boolean success=seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId).eq("stock",voucher.getStock())
.update();
乐观锁问题:成功率太低
乐观锁优化:
boolean success=seckillVoucherService
.update()
.setSql("stock=stock-1")
.eq("voucher_id",voucherId).gt("stock",0)
.update();
运行结果:50%的失败率,订单恰好没有超卖
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
优点:简单粗暴
缺点:性能一般
2.乐观锁:不加锁,在更新时判断是否有其他线程在修改
优点:性能好
缺点:存在成功率低的问题
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
代码实现:
//一人一单
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
//用户已经存在
return Result.fail("该用户已经购买过一次!");
}
对于高并发,解决线程安全问题:
(先提交事务,再释放锁)
第一步:添加依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
第三步:对方法加锁
如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理
这个this是没有事务功能的,因为拿到的目标对象(非代理对象)
所以需要拿到事务代理对象AopContext.currentProxy()
synchronized (userId.toString().intern()) {
//获取代理对象(事务)
IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
@Transactional public Result createVoucherOrder(Long voucherId) { //一人一单 Long userId = UserHolder.getUser().getId(); Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { //用户已经存在 return Result.fail("该用户已经购买过一次!"); } //5.扣减库存 boolean success = seckillVoucherService .update() .setSql("stock=stock-1") .eq("voucher_id", voucherId).gt("stock", 0) .update(); if (!success) { //扣减失败 return Result.fail("库存不足!"); } //6.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); //6.1 订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); //6.2 用户id voucherOrder.setUserId(userId); //6.3 代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); //7.返回订单id return Result.ok(orderId); }
压测结果:
因为集群模式下,每个JVM有各自的锁监视器
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现:
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
#添加锁,利用setnx的互斥特性
setnx lock thread1
#添加锁过期时间,避免服务宕机引起的死锁
expire lock 10
非阻塞:尝试一次,成功返回true,失败返回false
释放锁:
手动释放:del key
超时释放:获取锁时添加一个超时时间
需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能
@AllArgsConstructor public class SimpleRedisLock implements ILock{ private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX="lock:"; @Override public boolean tryLock(long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { //释放锁 stringRedisTemplate.delete(KEY_PREFIX+name); } }
优化一人一单:
Long userId=UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
boolean isLock = lock.tryLock(1200);
if(!isLock) {
//获取锁失败,返回错误或重试
return Result.fail("一个人只能下一单!");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
lock.unlock();
}
解决方案:在释放锁前判断这个锁是否还属于自己
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致
@AllArgsConstructor public class SimpleRedisLock implements ILock{ private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX="lock:"; private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-"; @Override public boolean tryLock(long timeoutSec) { String threadId =ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { //获取线程标识 String threadId=ID_PREFIX + Thread.currentThread().getId(); //获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); //判断标识是否一致 if(threadId.equals(id)) { //释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } } }
判断和释放锁是两个动作,这中间可能会发生并发问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。
这里重点介绍Redis提供的调用函数,语法如下:
#执行redis命令
redis.call('命令名称','key','其它参数',...)
#比如,我们要执行set name jack
redis.call('set','name','jack')
#如果我们要执行set name Rose,再执行get name,则脚本如下
redis.call('set','name','Rose')
local name = redis.call('get','name')
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:
执行无参脚本:
执行有参脚本:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:
基于Redis的分布式锁
释放锁的业务流程:
1.获取锁中的线程标识
2.判断是否与指定的标识(当前线程标识)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]
-- 获取锁中的线程标识 get key
local id = redis.call('get',key)
--比较线程标识与锁中的标识是否一致
if(id == threadId) then
-- 释放锁
return redis.call('del',key)
end
return 0
脚本优化:
if(redis.call('get',KEYS[1])==ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());
}
利用setnx nx ex获取锁,并设置过期时间,保存线程标识
特性:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
第一步:引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
第二步:配置Redisson
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
//创建RedissonClient对象
return Redisson.create(config);
}
}
3.使用Redisson分布式锁
使用jmeter压测通过,一个用户只能下一单:
redis无法实现可重入锁的原因:
Redisson可以实现可重入锁的原理:
lua脚本实现:
测试代码:
@SpringBootTest @Slf4j public class RedissonTest { @Autowired private RedissonClient redissonClient; private RLock lock; @BeforeEach void setup(){ lock=redissonClient.getLock("order"); } @Test void method1(){ boolean isLock = lock.tryLock(); if(!isLock){ log.error("获取锁失败……1"); return; } try{ log.info("获取锁成功……1"); method2(); log.info("开始执行业务……1"); }finally{ log.warn("准备释放锁……1"); lock.unlock(); } } void method2(){ boolean isLock = lock.tryLock(); if(!isLock){ log.error("获取锁失败……2"); return; } try{ log.info("获取锁成功……2"); log.info("开始执行业务……2"); }finally{ log.warn("准备释放锁……2"); lock.unlock(); } } }
运行截图:
查看源码实现:
trylock:
unlock:
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
修改method1:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
跟入源码:
注:redis.call('pttl',KEYS[1]);
中的pttl返回以毫秒为单位,而ttl返回以秒为单位
如果该线程拿到锁,则返回nil;否则返回剩余有效期
代码精华部分:
while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); }
怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?
自动更新续期:
scheduleExpirationRenewal(threadId);
renewExpiration:更新有效期
实现永不过期的代码:
Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
那么,何时取消计时刷新呢?
在锁释放的时候结束计时
redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题
解决方案:所有节点都变成独立的redis节点
总结:
1)不可重入的Redis分布式锁:
原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失败
2)可重入的Redis分布式锁:
原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
怎么保证一人一单呢?Set集合
怎么确保代码执行的原子性?lua脚本
需求:
①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); //保存秒杀库存到Redis中 stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString()); }
新增优惠券:
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表 -- 1.1 优惠券id local voucherId = ARGV[1] --1.2 用户id local userId = ARGV[2] -- 2.数据key -- 2.1 库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2 订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1 判断库存是否充足 get stockKey if(tonumber(redis.call('get',stockKey))<=0) then -- 3.2 库存不足,返回1 return 1 end -- 3.2 判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember',orderKey,userId) == 1) then -- 3.3 存在,说明是重复下单,返回2 return 2 end -- 3.4 扣库存 incrby stockKey -1 redis.call('incrby',stockKey,-1) -- 3.5 下单(保存用户)sadd orderKey userId redis.call('sadd',orderKey,userId) return 0
③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
@Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Autowired private StringRedisTemplate stringRedisTemplate; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } @Override public Result seckillVoucher(Long voucherId) { // 取出用户 Long userId = UserHolder.getUser().getId(); //1. 执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); int r=result.intValue(); //2. 判断结果是否为0 if(r != 0) { //2.1 不为0,没有购买资格 return Result.fail(r==1 ? "库存不足" : "不能重复下单"); } //2.1 为0,有购买资格,把下单信息保存到阻塞队列 long orderId = redisIdWorker.nextId("order"); // TODO 保存阻塞队列 //3.返回订单id return Result.ok(orderId); }
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:
@Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private RedissonClient redissonClient; @Autowired private StringRedisTemplate stringRedisTemplate; private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024); private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); private IVoucherOrderService proxy; @PostConstruct private void init(){ SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable{ @Override public void run() { while(true){ try { //1.获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞 //2.创建订单 handleVoucherOrder(voucherOrder); } catch (InterruptedException e) { log.error("处理订单异常",e); } } } } private void handleVoucherOrder(VoucherOrder voucherOrder) { //1.获取用户 Long userId=voucherOrder.getUserId(); //2.创建锁对象 RLock lock=redissonClient.getLock("lock:order:" + userId); //3.获取锁 boolean isLock = lock.tryLock(); //4.判断是否获取锁成功 if(!isLock){ log.error("不允许重复下单"); return; } try{ proxy.createVoucherOrder(voucherOrder); }finally{ lock.unlock(); } } private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } @Override public Result seckillVoucher(Long voucherId) { // 取出用户 Long userId = UserHolder.getUser().getId(); //1. 执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString() ); int r=result.intValue(); //2. 判断结果是否为0 if(r != 0) { //2.1 不为0,没有购买资格 return Result.fail(r==1 ? "库存不足" : "不能重复下单"); } //2.1 为0,有购买资格,把下单信息保存到阻塞队列 long orderId = redisIdWorker.nextId("order"); // TODO 保存阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); //3.返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { //一人一单 Long userId = voucherOrder.getUserId(); Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); if (count > 0) { //用户已经存在 log.error("用户已经购买过一次"); return; } //5.扣减库存 boolean success = seckillVoucherService .update() .setSql("stock=stock-1") .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) .update(); if (!success) { //扣减失败 log.error("库存不足"); } save(voucherOrder); } }
秒杀业务的优化思路是什么?
①先利用Redis完成库存余量、一人一单判断,完成抢单业务
②再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果
队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。
示例:
基于List的消息队列有哪些优缺点?
优点:
缺点:
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg:向一个频道发送消息
PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道
pattern:
?代表一个字符
[ae] 代表可以是a也可以是e
示例:
基于PubSub的消息队列有哪些优缺点?
优点:
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列
示例1:
===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次
示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来
消费者应用:
bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
消息漏读:
STREAM类型消息队列的XREAD命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
基于Stream的消费队列—消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始
示例:
Java代码实现:
STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
总结:
需求:
①创建一个Stream类型的消息队列,名为stream.orders
参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
添加orderId:
--1.3 订单id
local orderId = ARGV[3]
发送消息到队列中
-- 3.6 发送消息到队列中
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
改造Java代码:
@Override public Result seckillVoucher(Long voucherId) { // 取出用户 Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); //1. 执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId) ); int r=result.intValue(); //2. 判断结果是否为0 if(r != 0) { //2.1 不为0,没有购买资格 return Result.fail(r==1 ? "库存不足" : "不能重复下单"); } proxy=(IVoucherOrderService) AopContext.currentProxy(); //3.返回订单id return Result.ok(orderId); }
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
private class VoucherOrderHandler implements Runnable{ String queueName = "stream.orders"; @Override public void run() { while(true) { try { //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); //2.判断消息获取是否成功 if (list == null || list.isEmpty()) { // 如果获取失败,说明没有消息,继续下一次循环 continue; } // 解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //3.创建订单 handleVoucherOrder(voucherOrder); //4.ACK确认 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); } catch (Exception e) { log.error("处理订单异常", e); handlePendingList(); } } } private void handlePendingList(){ while(true){ try{ //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); //2.判断消息是否获取成功 if(list == null || list.isEmpty()){ //没有消息,说明pending-list中没有异常消息,结束循环 break; } //3.解析消息中的订单信息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); //4.如果获取成功,可以下单 handleVoucherOrder(voucherOrder); //5.ACK确认 stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId()); }catch(Exception e){ log.error("处理pending-list订单异常",e); try { Thread.sleep(20); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } }
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录
public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
Controller接口:
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
return blogService.queryBlogById();
}
Service实现:
@Override public Result queryBlogById(Long id) { //1.查询blog Blog blog = getById(id); if(blog == null){ return Result.fail("笔记不存在!"); } //2.查询blog有关的用户 queryBlogUser(blog); return Result.ok(blog); } private void queryBlogUser(Blog blog) { Long userId = blog.getUserId(); User user = userService.getById(userId); blog.setName(user.getNickName()); blog.setIcon(user.getIcon()); }
需求:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)
实现步骤:
①给Blog类中添加一个isLike字段,标识是否被当前用户点赞
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;
②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段
④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
@Override public Result queryHotBlog(Integer current) { // 根据用户查询 Page<Blog> page = query() .orderByDesc("liked") .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE)); // 获取当前页数据 List<Blog> records = page.getRecords(); // 查询用户 records.forEach(blog->{ this.queryBlogUser(blog); this.isBlogLiked(blog); }); return Result.ok(records); } @Override public Result queryBlogById(Long id) { //1.查询blog Blog blog = getById(id); if(blog == null){ return Result.fail("笔记不存在!"); } //2.查询blog有关的用户 queryBlogUser(blog); //3.查询blog是否被点赞 isBlogLiked(blog); return Result.ok(blog); } private void isBlogLiked(Blog blog) { Long userId = UserHolder.getUser().getId(); String key = "blog:liked:" + blog.getId(); Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString()); blog.setIsLike(BooleanUtil.isTrue(isMember)); } @Override public Result likeBlog(Long id) { //1. 获取登录用户 Long userId = UserHolder.getUser().getId(); //2. 判断当前登录用户是否已经点赞 String key = "blog:liked:" + id; Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString()); if(BooleanUtil.isFalse(isMember)) { //3. 如果未点赞,可以点赞 //3.1 数据库点赞数+1 boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update(); //3.2 保存用户到Redis的set集合 if(isSuccess){ stringRedisTemplate.opsForSet().add(key,userId.toString()); } }else { //4. 如果已经点赞,则取消点赞 //4.1 数据库点赞数-1 boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update(); //4.2 把用户从Redis的set集合移除 if (isSuccess) { stringRedisTemplate.opsForSet().remove(key, userId.toString()); } } return Result.ok(); }
需求:按照点赞时间先后排序,返回Top5的用户
一人只能点赞一次功能实现:
因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素
private void isBlogLiked(Blog blog) { UserDTO user = UserHolder.getUser(); if(user==null){ return; } Long userId = UserHolder.getUser().getId(); String key = BLOG_LIKED_KEY + blog.getId(); Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString()); blog.setIsLike(score!=null); } @Override public Result likeBlog(Long id) { //1. 获取登录用户 Long userId = UserHolder.getUser().getId(); //2. 判断当前登录用户是否已经点赞 String key = BLOG_LIKED_KEY + id; Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString()); if(score==null) { //3. 如果未点赞,可以点赞 //3.1 数据库点赞数+1 boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update(); //3.2 保存用户到Redis的set集合 zadd key value score if(isSuccess){ stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis()); } }else { //4. 如果已经点赞,则取消点赞 //4.1 数据库点赞数-1 boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update(); //4.2 把用户从Redis的set集合移除 if (isSuccess) { stringRedisTemplate.opsForZSet().remove(key, userId.toString()); } } return Result.ok(); }
查询点赞排行前五:
@Override public Result queryBlogLikes(Long id) { String key=BLOG_LIKED_KEY+id; //1. 查询top5的点赞用户 zrange key 0 4 Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4); if(top5 == null || top5.isEmpty()){ return Result.ok(Collections.emptyList()); } //2. 解析出其中的用户id List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList()); String idStr = StrUtil.join(",", ids); //3. 根据用户id查询用户 List<UserDTO> userDTOs = userService .query() .in("id",ids) .last("order by field(id,"+idStr+")") .list() .stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); //4. 返回 return Result.ok(userDTOs); }
需求:基于该表数据结构,实现两个接口:
①关注和取关接口
②判断是否关注的接口
Controller实现:
@RestController
@RequestMapping("/follow")
public class FollowController {
@Resource
private IFollowService followService;
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
return followService.follow(followUserId,isFollow);
}
@GetMapping("/or/not/{id}")
public Result follow(@PathVariable("id")Long followUserId){
return followService.isFollow(followUserId);
}
}
ServiceImpl具体实现:
@Service public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService { @Override public Result follow(Long followUserId, Boolean isFollow) { // 获取登录用户 Long userId = UserHolder.getUser().getId(); //1.判断到底是关注还是取关 if (isFollow) { //2.关注,新增数据 Follow follow = new Follow(); follow.setUserId(userId); follow.setFollowUserId(followUserId); save(follow); } else { //3.取关,删除 remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId)); } return Result.ok(); } @Override public Result isFollow(Long followUserId) { //1.获取登录用户 Long userId = UserHolder.getUser().getId(); //2.查询是否关注 Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count(); //3.判断 return Result.ok(count>0); } }
将下面的代码插入到UserController:
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id")Long userId){
User user = userService.getById(userId);
if(user == null){
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
return Result.ok(userDTO);
}
将下面的代码插入到BlogController
@GetMapping("/of/user")
public Result queryBlogByUserId(
@RequestParam(value="current",defaultValue = "1")Integer current,
@RequestParam("id")Long id){
Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
List<Blog> records = page.getRecords();
return Result.ok(records);
}
要实现共同关注的查找,可以考虑集合set的交集
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友
①修改follow代码,将用户和关注的用户加入集合
@Override public Result follow(Long followUserId, Boolean isFollow) { // 获取登录用户 Long userId = UserHolder.getUser().getId(); String key="follows:"+userId; //1.判断到底是关注还是取关 if (isFollow) { //2.关注,新增数据 Follow follow = new Follow(); follow.setUserId(userId); follow.setFollowUserId(followUserId); boolean isSuccess = save(follow); //判断是否关注成功 if(isSuccess){ //如果关注成功,把关注用户的id放入redis的set集合 stringRedisTemplate.opsForSet().add(key,followUserId.toString()); } } else { //3.取关,删除 boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId)); if (isSuccess) { stringRedisTemplate.opsForSet().remove(key, followUserId.toString()); } } return Result.ok(); }
②求共同关注
controller:
@GetMapping("/commons/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}
具体实现:
@Resource private IUserService userService; @Override public Result followCommons(Long id) { //1.获取当前用户 Long userId = UserHolder.getUser().getId(); String key="follows:"+userId; //2.求交集 String key2="follows:"+id; Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2); if(intersect==null || intersect.isEmpty()){ //无交集 return Result.ok(Collections.emptyList()); } //3.解析id集合 List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList()); //4.查询用户 List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList()); return Result.ok(users); }
运行效果:
关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。
Feed流产品有两种常见的模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
①拉模式
②推模式
③推拉结合
拉模式:也叫读扩散
每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。
推模式:也叫作写扩散。
关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。
推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。
对于普通人,发送的消息可以直接推送到粉丝的收件箱。
对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。
总结:
需求:
①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
③查询收件箱数据时,可以实现分页查询
Feed流的滚动分页:
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
接口:
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
return blogService.saveBlog(blog);
}
具体实现:
@Resource private IFollowService followService; @Override public Result saveBlog(Blog blog) { //1.获取登录用户 UserDTO user = UserHolder.getUser(); blog.setUserId(user.getId()); //2.保存探店笔记 boolean isSuccess = save(blog); if(!isSuccess){ return Result.fail("新增笔记失败!"); } //3.查询笔记作者的所有粉丝 List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list(); //4.返回id for(Follow follow:follows){ //4.1 获取粉丝id Long userId = follow.getUserId(); //4.2 推送 String key="feed:"+userId; stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis()); } return Result.ok(blog.getId()); }
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
以z1为例复习相应命令:
①倒序按范围查询
②混乱插入(此时插入一条数据,继续查询)
③解决方法:可以按照分数查询
zrevrangebyscore key max min withscores limit offsest count
其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1
count代表一次查询多少条(数量)
④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据
将m7的分数值改为6:
此时再按照记忆的最后一条分数去做查询,会发现数据重复:
⑤总结:
滚动分页查询参数:
max:当前时间戳或者上一次查询的最小时间戳
min:0
offset:0 或者 在上一次的结果中,与最小值一样的元素的个数
count:3(与前端约定好)
①定义滚动查询结果的实体类
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}
②定义接口
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
return blogService.queryBlogOfFollow(max,offset);
}
③具体实现
@Override public Result queryBlogOfFollow(Long max, Integer offset) { //1.获取当前用户 Long userId = UserHolder.getUser().getId(); //2.查询收件箱 String key=FEED_KEY+userId; Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2); //3.非空判断 if(typedTuples==null || typedTuples.isEmpty()){ return Result.ok(); } //4.解析数据:blogId,minTime(时间戳),offset ArrayList<Object> ids = new ArrayList<>(typedTuples.size()); long minTime=0; int os=1; for(ZSetOperations.TypedTuple<String> tuple:typedTuples) { //4.1 获取id ids.add(Long.valueOf(tuple.getValue())); //4.2 获取分数(时间戳) long time = tuple.getScore().longValue(); if (time == minTime) { os++; } else { minTime = time; os = 1; } } //5.根据id查询blog String idStr = StrUtil.join(",", ids); List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list(); for (Blog blog : blogs) { //5.1 查询blog有关的用户 queryBlogUser(blog); //5.2 查询blog是否被点赞 isBlogLiked(blog); } //6.封装并返回 ScrollResult r = new ScrollResult(); r.setList(blogs); r.setOffset(os); r.setMinTime(minTime); return Result.ok(r); }
案例:练习Redis的GEO功能
需求:
1.添加下面几条数据:
北京南站(116.378248 39.865275)
北京站(116.42803 39.903738)
北京西站(116.322287 39.893729)
查看redis客户端:
==>geo底层采用ZSET实现
2.计算北京西站到北京南站的距离
==>默认单位是米
3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序
按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
导入数据(通过单元测试):
@Test void loadShopData(){ //1.查询店铺信息 List<Shop> list = shopService.list(); //2.把店铺按照typeId分组,id一致的放到一个集合 Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId)); //3.分批完成写入Redis for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){ //3.1 获取类型id Long typeId = entry.getKey(); String key="shop:geo:"+typeId; //3.2 获取同类型的店铺集合 List<Shop> value = entry.getValue(); //3.3 写入redis List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size()); for(Shop shop:value){ locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY()))); } stringRedisTemplate.opsForGeo().add(key,locations); } }
首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本
我们将5.3.7和1.3.9版本排除,观察代码:
然后手动引入新版本:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
接口:
/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x",required = false) Double x,
@RequestParam(value = "y",required = false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
具体实现:
@Override public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) { //1.判断是否需要根据坐标查询 if(x==null || y==null){ Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE)); return Result.ok(page.getRecords()); } //2.计算分页参数 int from=(current-1)*DEFAULT_PAGE_SIZE; int end=current*DEFAULT_PAGE_SIZE; //3.查询redis、按照距离排序、分页 结果:shopId、distance String key=SHOP_GEO_KEY+typeId; GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() .search( key, GeoReference.fromCoordinate(x, y), new Distance(5000), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) ); //4.解析出id if(results==null){ return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent(); if(list.size()<=from){ return Result.ok(Collections.emptyList()); } //4.1 截取from ~ end的那部分 List<Long> ids=new ArrayList<>(list.size()); Map<String,Distance> distanceMap=new HashMap<>(list.size()); list.stream().skip(from).forEach(result->{ //4.2 获取店铺id String shopIdStr = result.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); //4.3 获取距离 Distance distance = result.getDistance(); distanceMap.put(shopIdStr,distance); }); //5.根据id查询店铺Shop String idStr = StrUtil.join(",", ids); List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list(); for(Shop shop:shops){ shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()); } //6.返回 return Result.ok(shops); }
效果:
注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:
if(list.size()<=from){
return Result.ok(Collections.emptyList());
}
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)
Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。
练习:
BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11
BITPOS bm1 0 :代表查找第一个0出现的位置
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)
接口:
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
具体实现:
@Override
public Result sign() {
//1.获取当前登录的用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key=USER_SIGN_KEY+userId+keySuffix;
//4.获取今天是本月的第几天
int dayOfMonth=now.getDayOfMonth();
//5.写入Redis
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}
测试:
Q1:什么叫做连续签到天数?
A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
Q2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
Q3:如何从后向前遍历每个bit位?
与1做与运算,就能得到最后一个bit位
随后右移1位,下一个bit位就成为了最后一个bit位
案例:实现签到统计功能
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
接口:
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}
具体实现:
@Override public Result signCount() { //1.获取当前登录用户 Long userId = UserHolder.getUser().getId(); //2.获取日期 LocalDateTime now = LocalDateTime.now(); //3.拼接key String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM")); String key=USER_SIGN_KEY+userId+keySuffix; //4.获取今天是本月的第几天 int dayOfMonth = now.getDayOfMonth(); //5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字 List<Long> result = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands .create() .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)) .valueAt(0)); if(result==null || result.isEmpty()){ //没有任何签到结果 return Result.ok(0); } Long num = result.get(0); if(num==null || num==0){ return Result.ok(0); } //6.循环遍历 int count=0; while(true){ //6.1 让这个数字与1做与运算,得到数字的最后一个bit位 if((num&1)==0){ //如果为0,说明未签到,结束 break; }else{ //如果不为0,说明已签到,计数器+1 count++; } num >>>= 1; // >>>代表无符号右移 } return Result.ok(count); }
测试:返回值为3
实际上也确实是3个连续的1:
测试通过!
UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。
命令练习:
我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:
@Test void testHyperLogLog(){ String[] values=new String[1000]; int j=0; for (int i = 0; i < 1000000; i++) { j=i%1000; values[j]="user_"+i; if(j==999){ //发送到Redis stringRedisTemplate.opsForHyperLogLog().add("hl2",values); } } //统计数量 Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2"); System.out.println("count="+count); }
运行结果:
内存消耗:
运行前内存:
运行后内存:
1929640-1505552= 424088
424088/1024/1024=0.4M
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。