赞
踩
发送验证码功能:
- @Override
- public Result sendCode(String phone, HttpSession session) {
- //1.校验手机号
- if(RegexUtils.isPhoneInvalid(phone)){
- //2.如果不符合,返回错误信息
- return Result.fail("手机号格式错误!");
- }
- //3.符合,生成验证码
- String code = RandomUtil.randomNumbers(6);
- //4.保存验证码到session
- session.setAttribute("code",code);
- //5.发送验证码
- log.debug("发送短信验证码成功,验证码:{}"+code);
- //返回ok
- return Result.ok();
- }
登录功能:
登录表单的实体类:
- @Data
- public class LoginFormDTO {
- private String phone;
- private String code;
- private String password;
- }
登录逻辑代码实现:
- @Override
- public Result login(LoginFormDTO loginForm, HttpSession session) {
- //1.校验手机号
- String phone = loginForm.getPhone();
- if(RegexUtils.isPhoneInvalid(phone)){
- //如果不符合,返回错误信息
- return Result.fail("手机号格式错误!");
- }
- //2.校验验证码
- Object cacheCode = session.getAttribute("code");
- String code = loginForm.getCode();
- if(cacheCode==null ||! cacheCode.toString().equals(code)){
- //3.不一致,报错
- return Result.fail("验证码错误!");
- }
- //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
- User user = query().eq("phone", phone).one();
- //5.判断用户是否存在
- if(user==null) {
- //6.不存在,创建新用户并保存
- user=createUsrWithPhone(phone);
- }
- //7.保存用户信息到session中
- session.setAttribute("user",user);
- return null;
- }
-
- private User createUsrWithPhone(String phone) {
- //1.创建用户
- User user = new User();
- user.setPhone(phone);
- user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
- //2.保存用户
- save(user);
- return user;
- }
ThreadLocal
叫做本地线程变量,意思是说,ThreadLocal
中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal
为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量。
注意,为了隐藏用户敏感信息,也为了节省ThreadLocal的空间,需要将User转为UserDTO返回给前端。
可以通过hutool工具类,在UserService里修改:
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
- public class UserHolder {
- private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
-
- public static void saveUser(UserDTO user){
- tl.set(user);
- }
-
- public static UserDTO getUser(){
- return tl.get();
- }
-
- public static void removeUser(){
- tl.remove();
- }
- }
拦截器:
- public class LoginInterceptor implements HandlerInterceptor {
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //1.获取session
- HttpSession session = request.getSession();
- //2.获取session中的用户
- Object user = session.getAttribute("user");
- //3.判断用户是否存在
- if(user==null) {
- //4.不存在,拦截,返回401状态码,代表未授权
- response.setStatus(401);
- return false;
- }
- //5.存在,保存用户信息到ThreadLocal
- UserHolder.saveUser((UserDTO) user);
- //6.放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
添加拦截器:
- @Configuration
- public class MvcConfig implements WebMvcConfigurer {
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new LoginInterceptor())
- .excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/voucher/**","/upload/**");
- }
- }
登录并返回:
- @GetMapping("/me")
- public Result me(){
- // TODO 获取当前登录的用户并返回
- UserDTO user = UserHolder.getUser();
- return Result.ok(user);
- }
session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题
session的替代方案应该满足:
数据共享
内存存储
key、value结构
===>redis
拦截器修改:
- public class LoginInterceptor implements HandlerInterceptor {
-
- private StringRedisTemplate stringRedisTemplate;
-
- public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate=stringRedisTemplate;
- }
-
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //1.获取请求头中的token
- String token = request.getHeader("authorization");
- //判断token是否为空
- if(StrUtil.isBlank(token)){
- response.setStatus(401);
- return false;
- }
- String key=LOGIN_USER_KEY+token;
- //2.基于token获取redis中的用户
- Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
- //3.判断用户是否存在
- if(userMap.isEmpty()){
- //不存在,拦截,返回401状态码
- response.setStatus(401);
- return false;
- }
- //5.存在,将查询到的Hash数据转为UserDTO对象
- UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
- //6.保存用户到ThreadLocal
- UserHolder.saveUser(userDTO);
- //7.刷新token有效期
- stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
- //8.放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
MvcConfig修改:
- @Configuration
- public class MvcConfig implements WebMvcConfigurer {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
- .excludePathPatterns(
- "/user/code",
- "/user/login",
- "/blog/hot",
- "/shop/**",
- "/shop-type/**",
- "/voucher/**",
- "/upload/**");
- }
- }
UserServiceImpl修改:
- @Service
- public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- @Override
- public Result sendCode(String phone, HttpSession session) {
- //1.校验手机号
- if(RegexUtils.isPhoneInvalid(phone)){
- //2.如果不符合,返回错误信息
- return Result.fail("手机号格式错误!");
- }
- //3.符合,生成验证码
- String code = RandomUtil.randomNumbers(6);
- //4.保存验证码到redis中
- stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
- //5.发送验证码
- log.debug("发送短信验证码成功,验证码:{}"+code);
- //返回ok
- return Result.ok();
- }
-
- @Override
- public Result login(LoginFormDTO loginForm, HttpSession session) {
- //1.校验手机号
- String phone = loginForm.getPhone();
- if(RegexUtils.isPhoneInvalid(phone)){
- //如果不符合,返回错误信息
- return Result.fail("手机号格式错误!");
- }
- //2.校验验证码
- String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
- String code = loginForm.getCode();
- if(cacheCode==null ||!cacheCode.equals(code)){
- //3.不一致,报错
- return Result.fail("验证码错误!");
- }
- //4.一致,根据手机号查询用户 select * from tb_user where phone = ?
- User user = query().eq("phone", phone).one();
- //5.判断用户是否存在
- if(user==null) {
- //6.不存在,创建新用户并保存
- user=createUsrWithPhone(phone);
- }
- //保存用户信息到redis中
- //1.随机生成token,作为登录令牌
- String token = UUID.randomUUID().toString(true); //true代表isSimple,即不带中划线
- //2.将User对象转为Hash存储
- UserDTO userDTO=BeanUtil.copyProperties(user,UserDTO.class);
- Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
- String tokenKey=LOGIN_USER_KEY+token;
- //7.存储
- stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
- //设置token有效期
- stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
- return Result.ok();
- }
-
- private User createUsrWithPhone(String phone) {
- //1.创建用户
- User user = new User();
- user.setPhone(phone);
- user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
- //2.保存用户
- save(user);
- return user;
- }
- }
Redis代替session需要考虑的问题:
选择合适的数据结构
选择合适的key
选择合适的存储粒度
RefreshTokenInterceptor
- public class RefreshTokenInterceptor implements HandlerInterceptor {
-
- private StringRedisTemplate stringRedisTemplate;
-
- public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate=stringRedisTemplate;
- }
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
-
- //1.获取请求头中的token
- String token = request.getHeader("authorization");
- //判断token是否为空
- if(StrUtil.isBlank(token)){
- return true;
- }
- String key=LOGIN_USER_KEY+token;
- //2.基于token获取redis中的用户
- Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
- //3.判断用户是否存在
- if(userMap.isEmpty()){
- return true;
- }
- //5.存在,将查询到的Hash数据转为UserDTO对象
- UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
- //6.保存用户到ThreadLocal
- UserHolder.saveUser(userDTO);
- //7.刷新token有效期
- stringRedisTemplate.expire(key,LOGIN_USER_TTL, TimeUnit.MINUTES);
- //8.放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
LoginInterceptor
- public class LoginInterceptor implements HandlerInterceptor {
-
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //判断是否需要拦截(ThreadLocal中是否有用户)
- if(UserHolder.getUser()==null){
- //没有,需要拦截,设置状态码
- response.setStatus(401);
- return false;
- }
- //有用户,则放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
配置添加拦截器
注意通过order控制拦截器执行顺序,order越小越先执行
- @Configuration
- public class MvcConfig implements WebMvcConfigurer {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new LoginInterceptor())
- .excludePathPatterns(
- "/user/code",
- "/user/login",
- "/blog/hot",
- "/shop/**",
- "/shop-type/**",
- "/voucher/**",
- "/upload/**").order(1);
- registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
- }
- }
缓存就是数据交换的缓冲区(称作Cache),是存储数据的临时地方,一般读写性能较高
- /**
- * 根据id查询商铺信息
- * @param id 商铺id
- * @return 商铺详情数据
- */
- @GetMapping("/{id}")
- public Result queryShopById(@PathVariable("id") Long id) {
- return shopService.queryById(id);
- }
ShopServiceImpl:
- @Service
- public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- @Override
- public Result queryById(Long id) {
- String key=CACHE_SHOP_KEY+id;
- //1.从redis查询商铺缓存
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //2.判断是否存在
- if(StrUtil.isNotBlank(shopJson)) {
- //3.存在,直接返回
- Shop shop = JSONUtil.toBean(shopJson, Shop.class);
- return Result.ok(shop);
- }
- //4.不存在,根据id查询数据库
- Shop shop = getById(id);
- //5.不存在,返回错误
- if(shop==null){
- return Result.fail("店铺不存在!");
- }
- //6.存在,写入redis
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
- //7.返回
- return Result.ok(shop);
- }
- }
- @Service
- public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
-
- @Autowired
- private ShopTypeMapper shopTypeMapper;
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public Result queryBatch() {
- String key="CACHE_SHOP_TYPE_KEY";
- String jsonType = stringRedisTemplate.opsForValue().get(key);
- if(StrUtil.isNotBlank(jsonType)){
- List<ShopType> shopTypes = JSONUtil.toList(jsonType, ShopType.class);
- return Result.ok(shopTypes);
- }
- List<ShopType> shopTypes = shopTypeMapper.selectList(new QueryWrapper<>());
- if(shopTypes.isEmpty()){
- return Result.fail("您查询的页面不存在!");
- }
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shopTypes));
- return Result.ok(shopTypes);
- }
- }
业务场景:
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
主动更新策略:
操作缓存和数据库时有三个问题需要考虑:
1.删除缓存还是更新缓存?
更新缓存:每次更新都更新缓存,无效写操作较多
删除缓存:更新数据库时让缓存失效,查询时再更新缓存(胜出)
2.如何保证缓存与数据库的操作的同时成功或失败?
单体系统:将缓存与数据库操作放在一个事务
分布式系统:利用TCC等分布式事务方案
3.先操作缓存还是先操作数据库?
总结:
案例:给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的业务逻辑,满足下面的需求:
①根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
②根据id修改店铺时,先修改数据库,再删除缓存
更新操作:
- @Override
- @Transactional
- public Result update(Shop shop) {
- Long id=shop.getId();
- if(id==null){
- return Result.fail("店铺id不能为空!");
- }
- //1.更新数据库
- updateById(shop);
- //2.删除缓存
- stringRedisTemplate.delete(CACHE_SHOP_KEY+shop.getId());
- return Result.ok();
- }
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
- @Override
- public Result queryById(Long id) {
- String key=CACHE_SHOP_KEY+id;
- //1.从redis查询商铺缓存
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //2.判断是否存在
- if(StrUtil.isNotBlank(shopJson)) {
- //3.存在,直接返回
- Shop shop = JSONUtil.toBean(shopJson, Shop.class);
- return Result.ok(shop);
- }
- //命中的是否为空值
- if(shopJson!=null){
- return Result.fail("店铺信息不存在!");
- }
- //4.不存在,根据id查询数据库
- Shop shop = getById(id);
- //5.不存在,返回错误
- if(shop==null){
- //将空值写入redis
- stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
- return Result.fail("店铺信息不存在!");
- }
- //6.存在,写入redis
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
- //7.返回
- return Result.ok(shop);
- }
总结:
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存null值
布隆过滤
增强id的复杂度,避免被猜测id规律
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩是指同一时刻大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
互斥锁
逻辑过期
互斥锁和逻辑过期对比:
互斥锁和逻辑过期优缺点:
需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
- public Shop queryWithMutex(Long id){
- String key = CACHE_SHOP_KEY+id;
- //从redis查询商铺缓存
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //判断缓存是否命中
- if(StrUtil.isNotBlank(shopJson)){
- //命中则直接返回数据
- return JSONUtil.toBean(shopJson, Shop.class);
- }
- //判断是否是缓存穿透
- if(shopJson!=null){
- return null;
- }
- //实现缓存重建
- //1.获取互斥锁
- String lockKey=LOCK_SHOP_KEY+id;
- try{
- boolean isLock = tryLock(lockKey);
- //2.判断是否获取成功
- if(!isLock) {
- //3.失败,则休眠并重试
- Thread.sleep(50);
- queryWithMutex(id);
- }
- //4.成功,根据id查询数据库
- Shop shop = getById(id);
- //5.不存在,返回错误
- if(shop==null){
- stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
- return null;
- }
- //6.存在,写入redis
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
- return shop;
- }catch(InterruptedException e){
- throw new RuntimeException(e);
- }finally{
- unlock(lockKey);
- }
- }
-
- private boolean tryLock(String key){
- Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
- return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
- }
-
- private void unlock(String key){
- stringRedisTemplate.delete(key);
- }
需求:修改根据id删除商铺的业务,基于逻辑过期的方式来解决缓存击穿问题
添加逻辑过期时间:
- @Data
- public class RedisData {
- private LocalDateTime expireTime;
- private Object data;
- }
整体实现逻辑:
- private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
-
- public Shop queryWithLogicalExpire(Long id){
- String key=CACHE_SHOP_KEY+id;
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //缓存未命中
- if(StrUtil.isBlank(shopJson)){
- return null;
- }
- //命中,需要先把json反序列化为对象
- RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
- Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
- LocalDateTime expireTime = redisData.getExpireTime();
- //判断是否过期
- if(expireTime.isAfter(LocalDateTime.now())) {
- //未过期,直接返回店铺信息
- return shop;
- }
- //已过期,需要缓存重建
- String lockKey=LOCK_SHOP_KEY+id;
- //获取互斥锁
- boolean isLock = tryLock(lockKey);
- //判断是否获取锁成功
- if(isLock) {
- //成功,开启独立线程,实现缓存重建
- CACHE_REBUILD_EXECUTOR.submit(()->{
- try {
- saveShop2Redis(id, CACHE_SHOP_TTL);
- }catch(Exception e){
- throw new RuntimeException(e);
- }finally{
- unlock(lockKey);
- }
- });
- }
- //返过期的商铺信息
- }
-
- private boolean tryLock(String key){
- Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.MINUTES);
- return BooleanUtil.isTrue(flag); //为防止程序在拆箱的时候出现空指针,要手动拆箱
- }
-
- private void unlock(String key){
- stringRedisTemplate.delete(key);
- }
-
- public void saveShop2Redis(Long id,Long expireSeconds){
- //1.查询店铺数据
- Shop shop = getById(id);
- //2.封装逻辑过期时间
- RedisData redisData = new RedisData();
- redisData.setData(shop);
- redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
- //3.写入Redis
- stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
- }
基于StringRedisTemplate封装一个缓存工具类,满足下列要求:
方法1:任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
- @Component
- public class CacheClient {
- private final StringRedisTemplate stringRedisTemplate;
-
- public CacheClient(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate=stringRedisTemplate;
- }
-
- public void set(String key, Object value, Long time, TimeUnit unit){
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
- }
-
- public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
- //设置逻辑过期
- RedisData redisData = new RedisData();
- redisData.setData(value);
- redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
- }
-
- public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit){
- String key=keyPrefix+id;
- String json = stringRedisTemplate.opsForValue().get(key);
- if(StrUtil.isNotBlank(json)){
- return JSONUtil.toBean(json,type);
- }
- //判断命中的是否为空值
- if(json!=null){
- return null;
- }
- R r = dbFallback.apply(id);
- if(r==null){
- stringRedisTemplate.opsForValue().set(key,"",time,unit);
- return null;
- }
- this.set(key,r,time,unit);
- return r;
- }
-
- private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
-
- public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){
- String key=keyPrefix+id;
- String json = stringRedisTemplate.opsForValue().get(key);
- if(StrUtil.isBlank(json)){
- return null;
- }
- RedisData redisData = JSONUtil.toBean(json, RedisData.class);
- R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
- LocalDateTime expireTime = redisData.getExpireTime();
- //判断是否过期
- if(expireTime.isAfter(LocalDateTime.now())){
- //未过期,直接返回店铺信息
- return r;
- }
- //已过期,需要缓存重建
- String lockKey=LOCK_SHOP_KEY+id;
- boolean isLock = tryLock(lockKey);
- if(isLock){
- CACHE_REBUILD_EXECUTOR.submit(()->{
- try{
- //重建缓存
- //1.查询数据库
- R apply = dbFallback.apply(id);
- //2.存入redis
- this.setWithLogicalExpire(key,apply,time,unit);
- }catch (Exception e){
- throw new RuntimeException(e);
- }finally{
- unlock(key);
- }
- });
- }
- return r;
- }
-
- private boolean tryLock(String key){
- Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
- return BooleanUtil.isTrue(flag);
- }
-
- private void unlock(String key){
- stringRedisTemplate.delete(key);
- }
- }
什么是缓存?
一种具备高效读写能力的数据暂存区域
缓存的作用?
降低后端负载
提高读写响应速度
缓存的成本?
开发成本
运维成本
一致性问题
三种策略:
内存淘汰:redis自带的内存淘汰机制
过期淘汰:利用expire命令给数据设置过期时间
主动更新:主动完成数据库与缓存的同时更新
策略选择:
低一致性需求:内存淘汰或过期淘汰
高一致性需求:
主动更新为主
过期淘汰兜底
Cache Aside:缓存调用者在更新数据库的同时完成对缓存的更新
一致性良好、实现难度一般
Read/Write Through:缓存与数据库集成为一个服务,服务保证两者的一致性,对外暴露API接口,调用者调用API,无需知道自己操作的是数据库还是缓存,不关心一致性
一致性优秀、实现复杂、性能一般
Write Back:缓存调用者的CRUD都针对缓存完成。有独立线程异步将缓存数据写到数据库,实现最终一致
一致性差、性能好、实现复杂
更新缓存还是删除缓存?
更新缓存会产生无效更新,并且存在较大的线程安全问题
删除缓存本质是延迟更新,没有无效更新,线程安全问题相对较低
先操作数据库还是缓存?
先更新数据,再删除缓存——在满足原子性的情况下,安全问题概率较低
先删除缓存,再更新数据库——安全问题概率极高
如何确保数据库与缓存操作原子性?
单体系统——利用事务机制
分布式系统——利用分布式事务机制
查询数据时
1.先查询缓存
2.如果缓存命中,直接返回
3.如果缓存未命中,则查询数据库
4.将数据库数据写入缓存
5.返回结果
修改数据库时:
1.先修改数据库
2.然后删除缓存
===>确保两者的原子性
产生原因:客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
解决方案:
①缓存空对象:
思路:对于不存在的数据也在redis建立缓存,值为空,并设置一个较短的TTL时间
优点:实现简单,便于维护
缺点:额外的内存消耗、短期的数据不一致问题
②布隆过滤:
思路:利用布隆过滤算法,在请求进入Redis之前先判断是否存在,如果不存在则直接拒绝请求
优点:内存占用少
缺点:实现复杂、存在误判的可能性
③其他:
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
产生原因:在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
产生原因:
热点key:①在某一时段被高并发访问 ②缓存重建耗时较长
热点key突然过期,因为缓存重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击
解决方案:
互斥锁:
思路:给缓存重建过程加锁,确保重建过程只有一个线程执行,其他线程等待
优点:①实现简单 ②没有额外内存消耗 ③一致性好
缺点:①等待导致性能下降
缺点:有死锁风险
逻辑过期:
思路:
①热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存
②重建缓存也通过互斥锁来保证单线程执行
③重建缓存利用独立线程异步执行
④其他线程无需等待,直接查询到旧数据即可
优点:现成无需等待,性能较好
缺点:
①不保证一致性
②有额外内存消耗
③实现复杂
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:
id的规律性太明显
受单表数据量的限制
全局ID生成器:
全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,可以支持每秒产生2^32个不同的ID
- @Component
- public class RedisIdWorker {
- //开始时间戳 2023-01-01 00:00:00
- private static final long BEGIN_TIMESTAMP=1672531200L;
- private static final int COUNT_BITS=32;
-
- private StringRedisTemplate stringRedisTemplate;
-
- public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate=stringRedisTemplate;
- }
-
- public long nextId(String keyPrefix){
- //1.生成时间戳
- LocalDateTime now = LocalDateTime.now();
- long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
- long timestamp=nowSecond-BEGIN_TIMESTAMP;
- //2.生成序列号
- //2.1 获取当前日期,精确到天
- String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
- //2.2 自增长
- long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);
- //3.拼接并返回
- return timestamp<<COUNT_BITS | count;
- }
-
- }
测试代码:
- @Resource
- private RedisIdWorker redisIdWorker;
-
- private ExecutorService es= Executors.newFixedThreadPool(500);
- @Test
- void testIdWorker() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(300);
- Runnable task=()->{
- for (int i = 0; i < 100; i++) {
- long id = redisIdWorker.nextId("order");
- System.out.println("id="+id);
- }
- latch.countDown();
- };
- latch.countDown();
- long begin = System.currentTimeMillis();
- for (int i = 0; i < 300; i++) {
- es.submit(task);
- }
- latch.await();
- long end=System.currentTimeMillis();
- System.out.println("time="+(end-begin));
- }
总结:
全局唯一ID生成策略:
UUID
Redis自增
snowflake算法
数据库自增
Redis自增ID策略:
每天一个key,方便统计订单量
ID构造是 时间戳+计数器
每个店铺都可以发布优惠券,分为评价券和特价劵。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间、结束抢购时间。特价优惠券才需要填写这些信息。
通过postman添加优惠券:
- {
- "shopId":1,
- "title":"100元代金券",
- "subTitle":"周一至周五均可使用",
- "rules":"全场通用\\n无需预约\\n可无限叠加\\n不兑换、不找零\\n仅限堂食",
- "payValue":8000,
- "actualValue":10000,
- "type":1,
- "stock":100,
- "beginTime":"2023-04-29T10:09:17",
- "endTime":"2023-04-29T23:09:04"
- }
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
- @Service
- public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
-
- @Resource
- private ISeckillVoucherService seckillVoucherService;
-
- @Resource
- private RedisIdWorker redisIdWorker;
-
- @Override
- @Transactional
- public Result seckillVoucher(Long voucherId) {
- //1.查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- //2.判断秒杀是否开始
- if(voucher.getBeginTime().isAfter(LocalDateTime.now())) {
- //尚未开始
- return Result.fail("秒杀尚未开始!");
- }
- //3.判断秒杀是否结束
- if(voucher.getEndTime().isBefore(LocalDateTime.now())){
- //已经结束
- return Result.fail("秒杀已经结束!");
- }
- //4.判断库存是否充足
- if(voucher.getStock()<1){
- //库存不足
- return Result.fail("库存不足!");
- }
- //5.扣减库存
- boolean success=seckillVoucherService
- .update()
- .setSql("stock=stock-1")
- .eq("voucher_id",voucherId)
- .update();
- if(!success){
- //扣减失败
- return Result.fail("库存不足!");
- }
- //6.创建订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1 订单id
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2 用户id
- Long userId = UserHolder.getUser().getId();
- voucherOrder.setUserId(userId);
- //6.3 代金券id
- voucherOrder.setVoucherId(voucherId);
- save(voucherOrder);
- //7.返回订单id
- return Result.ok(orderId);
- }
- }
用jmeter模拟高并发:
模拟结果:
原因:
超卖问题就是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
版本号法
CAS法
- boolean success=seckillVoucherService
- .update()
- .setSql("stock=stock-1")
- .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
- .update();
乐观锁问题:成功率太低
乐观锁优化:
- boolean success=seckillVoucherService
- .update()
- .setSql("stock=stock-1")
- .eq("voucher_id",voucherId).gt("stock",0)
- .update();
运行结果:50%的失败率,订单恰好没有超卖
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
优点:简单粗暴
缺点:性能一般
2.乐观锁:不加锁,在更新时判断是否有其他线程在修改
优点:性能好
缺点:存在成功率低的问题
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
代码实现:
- //一人一单
- Long userId = UserHolder.getUser().getId();
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if(count>0){
- //用户已经存在
- return Result.fail("该用户已经购买过一次!");
- }
对于高并发,解决线程安全问题:
(先提交事务,再释放锁)
第一步:添加依赖
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjweaver</artifactId>
- </dependency>
第二步:暴露代理对象 @EnableAspectJAutoProxy(exposeProxy=true)
- @MapperScan("com.hmdp.mapper")
- @SpringBootApplication
- @EnableAspectJAutoProxy(exposeProxy = true)
- public class HmDianPingApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(HmDianPingApplication.class, args);
- }
-
- }
第三步:对方法加锁
如果用this.createVoucherOrder(voucherId); 首先我们要知道事务如果想生效,需要Spring对该类做动态代理,拿到了代理对象,来对事务进行处理
这个this是没有事务功能的,因为拿到的目标对象(非代理对象)
所以需要拿到事务代理对象AopContext.currentProxy()
- synchronized (userId.toString().intern()) {
- //获取代理对象(事务)
- IVoucherOrderService proxy =(IVoucherOrderService) AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);
- }
- @Transactional
- public Result createVoucherOrder(Long voucherId) {
- //一人一单
- Long userId = UserHolder.getUser().getId();
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if (count > 0) {
- //用户已经存在
- return Result.fail("该用户已经购买过一次!");
- }
- //5.扣减库存
- boolean success = seckillVoucherService
- .update()
- .setSql("stock=stock-1")
- .eq("voucher_id", voucherId).gt("stock", 0)
- .update();
- if (!success) {
- //扣减失败
- return Result.fail("库存不足!");
- }
- //6.创建订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1 订单id
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2 用户id
- voucherOrder.setUserId(userId);
- //6.3 代金券id
- voucherOrder.setVoucherId(voucherId);
- save(voucherOrder);
- //7.返回订单id
- return Result.ok(orderId);
-
- }
压测结果:
因为集群模式下,每个JVM有各自的锁监视器
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现:
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见有三种:
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
#添加锁,利用setnx的互斥特性
setnx lock thread1
#添加锁过期时间,避免服务宕机引起的死锁
expire lock 10
非阻塞:尝试一次,成功返回true,失败返回false
释放锁:
手动释放:del key
超时释放:获取锁时添加一个超时时间
需求:定义一个类,实现下面的接口,利用Redis实现分布式锁功能
- @AllArgsConstructor
- public class SimpleRedisLock implements ILock{
-
- private String name;
- private StringRedisTemplate stringRedisTemplate;
- private static final String KEY_PREFIX="lock:";
-
- @Override
- public boolean tryLock(long timeoutSec) {
- long threadId = Thread.currentThread().getId();
- Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
- return Boolean.TRUE.equals(success);
- }
-
- @Override
- public void unlock() {
- //释放锁
- stringRedisTemplate.delete(KEY_PREFIX+name);
- }
- }
优化一人一单:
- Long userId=UserHolder.getUser().getId();
- //创建锁对象
- SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
- boolean isLock = lock.tryLock(1200);
- if(!isLock) {
- //获取锁失败,返回错误或重试
- return Result.fail("一个人只能下一单!");
- }
- try {
- //获取代理对象(事务)
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);
- }finally {
- lock.unlock();
- }
解决方案:在释放锁前判断这个锁是否还属于自己
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时现货区锁中的线程标识,判断是否与当前线程标识一致
如果一致则释放锁
如果不一致则不释放锁
- @AllArgsConstructor
- public class SimpleRedisLock implements ILock{
-
- private String name;
- private StringRedisTemplate stringRedisTemplate;
- private static final String KEY_PREFIX="lock:";
- private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
-
- @Override
- public boolean tryLock(long timeoutSec) {
- String threadId =ID_PREFIX + Thread.currentThread().getId();
- Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS);
- return Boolean.TRUE.equals(success);
- }
-
- @Override
- public void unlock() {
- //获取线程标识
- String threadId=ID_PREFIX + Thread.currentThread().getId();
- //获取锁中的标识
- String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
- //判断标识是否一致
- if(threadId.equals(id)) {
- //释放锁
- stringRedisTemplate.delete(KEY_PREFIX + name);
- }
- }
- }
判断和释放锁是两个动作,这中间可能会发生并发问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言。
这里重点介绍Redis提供的调用函数,语法如下:
- #执行redis命令
- redis.call('命令名称','key','其它参数',...)
- #比如,我们要执行set name jack
- redis.call('set','name','jack')
-
- #如果我们要执行set name Rose,再执行get name,则脚本如下
- redis.call('set','name','Rose')
- local name = redis.call('get','name')
- return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本常见命令如下:
执行无参脚本:
执行有参脚本:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入keys数组,其它参数会放入argv数组,在脚本中可以从keys和argv数组获取这些参数:
基于Redis的分布式锁
释放锁的业务流程:
1.获取锁中的线程标识
2.判断是否与指定的标识(当前线程标识)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
- -- 锁的key
- local key = KEYS[1]
- -- 当前线程标识
- local threadId = ARGV[1]
-
- -- 获取锁中的线程标识 get key
- local id = redis.call('get',key)
- --比较线程标识与锁中的标识是否一致
- if(id == threadId) then
- -- 释放锁
- return redis.call('del',key)
- end
- return 0
脚本优化:
- if(redis.call('get',KEYS[1]==ARGV[1])) then
- return redis.call('del',KEYS[1])
- end
- return 0
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下:
- private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
- static {
- UNLOCK_SCRIPT=new DefaultRedisScript<>();
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
- @Override
- public void unlock() {
- //调用lua脚本
- stringRedisTemplate.execute(UNLOCK_SCRIPT,
- Collections.singletonList(KEY_PREFIX+name),
- ID_PREFIX+Thread.currentThread().getId());
- }
基于Redis的分布式锁实现思路:
利用setnx nx ex获取锁,并设置过期时间,保存线程标识
释放锁时先判断线程标识与自己是否一致,一致则删除锁
特性:
利用set nx满足互斥性
利用set ex保证故障时依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
第一步:引入依赖
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.20.0</version>
- </dependency>
第二步:配置Redisson
- @Configuration
- public class RedissonConfig {
-
- @Bean
- public RedissonClient redissonClient(){
- //配置
- Config config = new Config();
- config.useSingleServer().setAddress("redis://192.168.202.128:6379").setPassword("123321");
- //创建RedissonClient对象
- return Redisson.create(config);
- }
- }
3.使用Redisson分布式锁
使用jmeter压测通过,一个用户只能下一单:
redis无法实现可重入锁的原因:
Redisson可以实现可重入锁的原理:
lua脚本实现:
测试代码:
- @SpringBootTest
- @Slf4j
- public class RedissonTest {
-
- @Autowired
- private RedissonClient redissonClient;
-
- private RLock lock;
-
- @BeforeEach
- void setup(){
- lock=redissonClient.getLock("order");
- }
-
- @Test
- void method1(){
- boolean isLock = lock.tryLock();
- if(!isLock){
- log.error("获取锁失败……1");
- return;
- }
- try{
- log.info("获取锁成功……1");
- method2();
- log.info("开始执行业务……1");
- }finally{
- log.warn("准备释放锁……1");
- lock.unlock();
- }
- }
-
- void method2(){
- boolean isLock = lock.tryLock();
- if(!isLock){
- log.error("获取锁失败……2");
- return;
- }
- try{
- log.info("获取锁成功……2");
- log.info("开始执行业务……2");
- }finally{
- log.warn("准备释放锁……2");
- lock.unlock();
- }
- }
- }
运行截图:
查看源码实现:
trylock:
unlock:
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
修改method1:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
跟入源码:
注:redis.call('pttl',KEYS[1]);中的pttl返回以毫秒为单位,而ttl返回以秒为单位
如果该线程拿到锁,则返回nil;否则返回剩余有效期
代码精华部分:
- while (true) {
- long currentTime = System.currentTimeMillis();
- ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
-
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
-
- // waiting for message
- currentTime = System.currentTimeMillis();
- if (ttl >= 0 && ttl < time) {
- commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
-
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- }
- } finally {
- unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
- }
怎么确保锁是因为业务结束而释放,而非阻塞导致的超时释放呢?
自动更新续期:
scheduleExpirationRenewal(threadId);
renewExpiration:更新有效期
实现永不过期的代码:
- Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
- if (ent == null) {
- return;
- }
- Long threadId = ent.getFirstThreadId();
- if (threadId == null) {
- return;
- }
-
- CompletionStage<Boolean> future = renewExpirationAsync(threadId);
- future.whenComplete((res, e) -> {
- if (e != null) {
- log.error("Can't update lock {} expiration", getRawName(), e);
- EXPIRATION_RENEWAL_MAP.remove(getEntryName());
- return;
- }
-
- if (res) {
- // reschedule itself
- renewExpiration();
- } else {
- cancelExpirationRenewal(null);
- }
- });
- }
- }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
那么,何时取消计时刷新呢?
在锁释放的时候结束计时
redisson分布式锁主从一致问题:如果主节点宕机,而未将数据同步给从节点,可能会导致并发问题
解决方案:所有节点都变成独立的redis节点
总结:
1)不可重入的Redis分布式锁:
原理:利用setnx的互斥性,利用ex避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失败
2)可重入的Redis分布式锁:
原理:利用hash结果,记录线程标识和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
怎么保证一人一单呢?Set集合
怎么确保代码执行的原子性?lua脚本
需求:
①新增秒杀优惠券的同时,将优惠券的信息保存到Redis中
-
- @Override
- @Transactional
- public void addSeckillVoucher(Voucher voucher) {
- // 保存优惠券
- save(voucher);
- // 保存秒杀信息
- SeckillVoucher seckillVoucher = new SeckillVoucher();
- seckillVoucher.setVoucherId(voucher.getId());
- seckillVoucher.setStock(voucher.getStock());
- seckillVoucher.setBeginTime(voucher.getBeginTime());
- seckillVoucher.setEndTime(voucher.getEndTime());
- seckillVoucherService.save(seckillVoucher);
- //保存秒杀库存到Redis中
- stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
-
- }
新增优惠券:
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- -- 1.参数列表
- -- 1.1 优惠券id
- local voucherId = ARGV[1]
- --1.2 用户id
- local userId = ARGV[2]
-
- -- 2.数据key
- -- 2.1 库存key
- local stockKey = 'seckill:stock:' .. voucherId
- -- 2.2 订单key
- local orderKey = 'seckill:order:' .. voucherId
-
- -- 3.脚本业务
- -- 3.1 判断库存是否充足 get stockKey
- if(tonumber(redis.call('get',stockKey))<=0) then
- -- 3.2 库存不足,返回1
- return 1
- end
- -- 3.2 判断用户是否下单 SISMEMBER orderKey userId
- if(redis.call('sismember',orderKey,userId) == 1) then
- -- 3.3 存在,说明是重复下单,返回2
- return 2
- end
- -- 3.4 扣库存 incrby stockKey -1
- redis.call('incrby',stockKey,-1)
- -- 3.5 下单(保存用户)sadd orderKey userId
- redis.call('sadd',orderKey,userId)
- return 0
③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- @Resource
- private ISeckillVoucherService seckillVoucherService;
-
- @Resource
- private RedisIdWorker redisIdWorker;
-
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
-
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
- @Override
- public Result seckillVoucher(Long voucherId) {
- // 取出用户
- Long userId = UserHolder.getUser().getId();
- //1. 执行lua脚本
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(),userId.toString()
- );
- int r=result.intValue();
- //2. 判断结果是否为0
- if(r != 0) {
- //2.1 不为0,没有购买资格
- return Result.fail(r==1 ? "库存不足" : "不能重复下单");
- }
- //2.1 为0,有购买资格,把下单信息保存到阻塞队列
- long orderId = redisIdWorker.nextId("order");
- // TODO 保存阻塞队列
- //3.返回订单id
- return Result.ok(orderId);
-
- }
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
源代码有大改动,包括之前写好的createVoucherOrder,所以附上完整源码:
- @Service
- @Slf4j
- public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
-
- @Resource
- private ISeckillVoucherService seckillVoucherService;
-
- @Resource
- private RedisIdWorker redisIdWorker;
-
- @Resource
- private RedissonClient redissonClient;
-
- @Autowired
- private StringRedisTemplate stringRedisTemplate;
-
- private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
- private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
- private IVoucherOrderService proxy;
- @PostConstruct
- private void init(){
- SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
- }
- private class VoucherOrderHandler implements Runnable{
- @Override
- public void run() {
- while(true){
- try {
- //1.获取队列中的订单信息
- VoucherOrder voucherOrder = orderTasks.take();//没有任务时会自动阻塞
- //2.创建订单
- handleVoucherOrder(voucherOrder);
- } catch (InterruptedException e) {
- log.error("处理订单异常",e);
- }
- }
- }
- }
-
- private void handleVoucherOrder(VoucherOrder voucherOrder) {
- //1.获取用户
- Long userId = voucherOrder.getUserId();
- //2.创建锁对象
- RLock lock = redissonClient.getLock("lock:order:" + userId);
- //3.获取锁
- boolean isLock = lock.tryLock();
- //4.判断是否获取锁成功
- if(!isLock){
- log.error("不允许重复下单");
- return;
- }
- try{
- proxy.createVoucherOrder(voucherOrder);
- }finally{
- lock.unlock();
- }
-
- }
-
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
- @Override
- public Result seckillVoucher(Long voucherId) {
- // 取出用户
- Long userId = UserHolder.getUser().getId();
- //1. 执行lua脚本
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(),userId.toString()
- );
- int r=result.intValue();
- //2. 判断结果是否为0
- if(r != 0) {
- //2.1 不为0,没有购买资格
- return Result.fail(r==1 ? "库存不足" : "不能重复下单");
- }
- //2.1 为0,有购买资格,把下单信息保存到阻塞队列
- long orderId = redisIdWorker.nextId("order");
- // TODO 保存阻塞队列
- VoucherOrder voucherOrder = new VoucherOrder();
- voucherOrder.setId(orderId);
- voucherOrder.setUserId(userId);
- voucherOrder.setVoucherId(voucherId);
- orderTasks.add(voucherOrder);
- //3.返回订单id
- return Result.ok(orderId);
-
- }
-
- @Transactional
- public void createVoucherOrder(VoucherOrder voucherOrder) {
- //一人一单
- Long userId = voucherOrder.getUserId();
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
- if (count > 0) {
- //用户已经存在
- log.error("用户已经购买过一次");
- return;
- }
- //5.扣减库存
- boolean success = seckillVoucherService
- .update()
- .setSql("stock=stock-1")
- .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
- .update();
- if (!success) {
- //扣减失败
- log.error("库存不足");
- }
- save(voucherOrder);
-
- }
- }
秒杀业务的优化思路是什么?
①先利用Redis完成库存余量、一人一单判断,完成抢单业务
②再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题
数据安全问题
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果
队列的入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用的是BRPOP或者BLPOP来实现阻塞效果。
示例:
基于List的消息队列有哪些优缺点?
优点:
利用Redis存储,不受限于JVM内存上限
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
无法避免消息丢失
只支持单消费者
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg:向一个频道发送消息
PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道
pattern:
?代表一个字符
* 代表零个或多个字符
[ae] 代表可以是a也可以是e
示例:
基于PubSub的消息队列有哪些优缺点?
优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
Stream是Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列
示例1:
===>结论 :基于stream的消息队列,消息会被持久化存储,可以被多个消费者读取,也可以被一个消费者读取多次
示例2:阻塞等待,block 0 代表一直阻塞直到有新消息的到来
消费者应用:
bug:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
消息漏读:
STREAM类型消息队列的XREAD命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
==>队列中的消息如果还想用,ID从0开始;如果不想用了,ID从$开始
示例:
Java代码实现:
STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
总结:
需求:
①创建一个Stream类型的消息队列,名为stream.orders
参数MKSTREAM,在创建组的时候,如果消息队列不存在,则自动创建组和队列
②修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
添加orderId:
- --1.3 订单id
- local orderId = ARGV[3]
发送消息到队列中
- -- 3.6 发送消息到队列中
- redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
改造Java代码:
- @Override
- public Result seckillVoucher(Long voucherId) {
- // 取出用户
- Long userId = UserHolder.getUser().getId();
- long orderId = redisIdWorker.nextId("order");
- //1. 执行lua脚本
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(),userId.toString(),String.valueOf(orderId)
- );
- int r=result.intValue();
- //2. 判断结果是否为0
- if(r != 0) {
- //2.1 不为0,没有购买资格
- return Result.fail(r==1 ? "库存不足" : "不能重复下单");
- }
- proxy=(IVoucherOrderService) AopContext.currentProxy();
- //3.返回订单id
- return Result.ok(orderId);
- }
③项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
- private class VoucherOrderHandler implements Runnable{
- String queueName = "stream.orders";
- @Override
- public void run() {
- while(true) {
- try {
- //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
- List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
- StreamOffset.create(queueName, ReadOffset.lastConsumed())
- );
- //2.判断消息获取是否成功
- if (list == null || list.isEmpty()) {
- // 如果获取失败,说明没有消息,继续下一次循环
- continue;
- }
- // 解析消息中的订单信息
- MapRecord<String, Object, Object> record = list.get(0);
- Map<Object, Object> values = record.getValue();
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
- //3.创建订单
- handleVoucherOrder(voucherOrder);
- //4.ACK确认 SACK stream.orders g1 id
- stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
- } catch (Exception e) {
- log.error("处理订单异常", e);
- handlePendingList();
- }
- }
- }
-
- private void handlePendingList(){
- while(true){
- try{
- //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
- List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1),
- StreamOffset.create(queueName, ReadOffset.from("0"))
- );
- //2.判断消息是否获取成功
- if(list == null || list.isEmpty()){
- //没有消息,说明pending-list中没有异常消息,结束循环
- break;
- }
- //3.解析消息中的订单信息
- MapRecord<String, Object, Object> record = list.get(0);
- Map<Object, Object> values = record.getValue();
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
- //4.如果获取成功,可以下单
- handleVoucherOrder(voucherOrder);
- //5.ACK确认
- stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
- }catch(Exception e){
- log.error("处理pending-list订单异常",e);
- try {
- Thread.sleep(20);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
第一步:将SystemConstants的常量改为部署在nginx项目下的imgs目录
public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
测试效果:
Controller接口:
- @GetMapping("/{id}")
- public Result queryBlogById(@PathVariable("id") Long id){
- return blogService.queryBlogById();
- }
Service实现:
- @Override
- public Result queryBlogById(Long id) {
- //1.查询blog
- Blog blog = getById(id);
- if(blog == null){
- return Result.fail("笔记不存在!");
- }
- //2.查询blog有关的用户
- queryBlogUser(blog);
- return Result.ok(blog);
- }
-
- private void queryBlogUser(Blog blog) {
- Long userId = blog.getUserId();
- User user = userService.getById(userId);
- blog.setName(user.getNickName());
- blog.setIcon(user.getIcon());
- }
需求:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前段已实现,判断字段Blog类的isLike属性)
实现步骤:
①给Blog类中添加一个isLike字段,标识是否被当前用户点赞
- /**
- * 是否点赞过了
- */
- @TableField(exist = false)
- private Boolean isLike;
②修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
③修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值給isLike字段
④修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
- @PutMapping("/like/{id}")
- public Result likeBlog(@PathVariable("id") Long id) {
- return blogService.likeBlog(id);
- }
- @Override
- public Result queryHotBlog(Integer current) {
- // 根据用户查询
- Page<Blog> page = query()
- .orderByDesc("liked")
- .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
- // 获取当前页数据
- List<Blog> records = page.getRecords();
- // 查询用户
- records.forEach(blog->{
- this.queryBlogUser(blog);
- this.isBlogLiked(blog);
- });
- return Result.ok(records);
- }
-
- @Override
- public Result queryBlogById(Long id) {
- //1.查询blog
- Blog blog = getById(id);
- if(blog == null){
- return Result.fail("笔记不存在!");
- }
- //2.查询blog有关的用户
- queryBlogUser(blog);
- //3.查询blog是否被点赞
- isBlogLiked(blog);
- return Result.ok(blog);
- }
-
- private void isBlogLiked(Blog blog) {
- Long userId = UserHolder.getUser().getId();
- String key = "blog:liked:" + blog.getId();
- Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- blog.setIsLike(BooleanUtil.isTrue(isMember));
- }
-
- @Override
- public Result likeBlog(Long id) {
- //1. 获取登录用户
- Long userId = UserHolder.getUser().getId();
- //2. 判断当前登录用户是否已经点赞
- String key = "blog:liked:" + id;
- Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- if(BooleanUtil.isFalse(isMember)) {
- //3. 如果未点赞,可以点赞
- //3.1 数据库点赞数+1
- boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
- //3.2 保存用户到Redis的set集合
- if(isSuccess){
- stringRedisTemplate.opsForSet().add(key,userId.toString());
- }
- }else {
- //4. 如果已经点赞,则取消点赞
- //4.1 数据库点赞数-1
- boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
- //4.2 把用户从Redis的set集合移除
- if (isSuccess) {
- stringRedisTemplate.opsForSet().remove(key, userId.toString());
- }
- }
- return Result.ok();
- }
需求:按照点赞时间先后排序,返回Top5的用户
一人只能点赞一次功能实现:
因为SortedSet中没有isMember的判断,所以在添加元素的时候加上时间戳;查询的时候如果对应的元素没有时间戳,则代表集合中没有这个元素
- private void isBlogLiked(Blog blog) {
- UserDTO user = UserHolder.getUser();
- if(user==null){
- return;
- }
- Long userId = UserHolder.getUser().getId();
- String key = BLOG_LIKED_KEY + blog.getId();
- Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
- blog.setIsLike(score!=null);
- }
-
- @Override
- public Result likeBlog(Long id) {
- //1. 获取登录用户
- Long userId = UserHolder.getUser().getId();
- //2. 判断当前登录用户是否已经点赞
- String key = BLOG_LIKED_KEY + id;
- Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
- if(score==null) {
- //3. 如果未点赞,可以点赞
- //3.1 数据库点赞数+1
- boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
- //3.2 保存用户到Redis的set集合 zadd key value score
- if(isSuccess){
- stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
- }
- }else {
- //4. 如果已经点赞,则取消点赞
- //4.1 数据库点赞数-1
- boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
- //4.2 把用户从Redis的set集合移除
- if (isSuccess) {
- stringRedisTemplate.opsForZSet().remove(key, userId.toString());
- }
- }
- return Result.ok();
- }
查询点赞排行前五:
- @Override
- public Result queryBlogLikes(Long id) {
- String key=BLOG_LIKED_KEY+id;
- //1. 查询top5的点赞用户 zrange key 0 4
- Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
- if(top5 == null || top5.isEmpty()){
- return Result.ok(Collections.emptyList());
- }
- //2. 解析出其中的用户id
- List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
- String idStr = StrUtil.join(",", ids);
- //3. 根据用户id查询用户
- List<UserDTO> userDTOs = userService
- .query()
- .in("id",ids)
- .last("order by field(id,"+idStr+")")
- .list()
- .stream()
- .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
- .collect(Collectors.toList());
- //4. 返回
- return Result.ok(userDTOs);
- }
需求:基于该表数据结构,实现两个接口:
①关注和取关接口
②判断是否关注的接口
Controller实现:
- @RestController
- @RequestMapping("/follow")
- public class FollowController {
- @Resource
- private IFollowService followService;
-
- @PutMapping("/{id}/{isFollow}")
- public Result follow(@PathVariable("id")Long followUserId,@PathVariable("isFollow")Boolean isFollow){
- return followService.follow(followUserId,isFollow);
- }
-
- @GetMapping("/or/not/{id}")
- public Result follow(@PathVariable("id")Long followUserId){
- return followService.isFollow(followUserId);
- }
- }
ServiceImpl具体实现:
- @Service
- public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
-
- @Override
- public Result follow(Long followUserId, Boolean isFollow) {
- // 获取登录用户
- Long userId = UserHolder.getUser().getId();
- //1.判断到底是关注还是取关
- if (isFollow) {
- //2.关注,新增数据
- Follow follow = new Follow();
- follow.setUserId(userId);
- follow.setFollowUserId(followUserId);
- save(follow);
- } else {
- //3.取关,删除
- remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));
- }
- return Result.ok();
- }
-
- @Override
- public Result isFollow(Long followUserId) {
- //1.获取登录用户
- Long userId = UserHolder.getUser().getId();
- //2.查询是否关注
- Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
- //3.判断
- return Result.ok(count>0);
- }
- }
将下面的代码插入到UserController:
- @GetMapping("/{id}")
- public Result queryUserById(@PathVariable("id")Long userId){
- User user = userService.getById(userId);
- if(user == null){
- return Result.ok();
- }
- UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
- return Result.ok(userDTO);
- }
将下面的代码插入到BlogController
- @GetMapping("/of/user")
- public Result queryBlogByUserId(
- @RequestParam(value="current",defaultValue = "1")Integer current,
- @RequestParam("id")Long id){
- Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
- List<Blog> records = page.getRecords();
- return Result.ok(records);
- }
要实现共同关注的查找,可以考虑集合set的交集
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友
①修改follow代码,将用户和关注的用户加入集合
- @Override
- public Result follow(Long followUserId, Boolean isFollow) {
- // 获取登录用户
- Long userId = UserHolder.getUser().getId();
- String key="follows:"+userId;
- //1.判断到底是关注还是取关
- if (isFollow) {
- //2.关注,新增数据
- Follow follow = new Follow();
- follow.setUserId(userId);
- follow.setFollowUserId(followUserId);
- boolean isSuccess = save(follow);
- //判断是否关注成功
- if(isSuccess){
- //如果关注成功,把关注用户的id放入redis的set集合
- stringRedisTemplate.opsForSet().add(key,followUserId.toString());
- }
-
- } else {
- //3.取关,删除
- boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));
- if (isSuccess) {
- stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
- }
- }
- return Result.ok();
- }
②求共同关注
controller:
- @GetMapping("/commons/{id}")
- public Result followCommons(@PathVariable("id") Long id){
- return followService.followCommons(id);
- }
具体实现:
- @Resource
- private IUserService userService;
- @Override
- public Result followCommons(Long id) {
- //1.获取当前用户
- Long userId = UserHolder.getUser().getId();
- String key="follows:"+userId;
- //2.求交集
- String key2="follows:"+id;
- Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
- if(intersect==null || intersect.isEmpty()){
- //无交集
- return Result.ok(Collections.emptyList());
- }
- //3.解析id集合
- List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
- //4.查询用户
- List<UserDTO> users = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
- return Result.ok(users);
-
- }
运行效果:
关注推送也叫作Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无线下拉刷新获取新的信息。
Feed流产品有两种常见的模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户
优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
①拉模式
②推模式
③推拉结合
拉模式:也叫读扩散
每次发送消息的时候都会加上一个时间戳,当用户打开个人页面,就会去关注的人的发件箱里拉取消息并按时间戳排序。缺点是耗时较长,优点是节省内存。
推模式:也叫作写扩散。
关注的博主发消息的时候会直接把消息推送到个人主页,并排好序。在本人阅读个人主页的时候,无需等待拉取信息等。优点是延时低,缺点是消耗内存。
推拉结合模式:也叫读写混合,兼具推和拉两种模式的优点。
对于普通人,发送的消息可以直接推送到粉丝的收件箱。
对于大V,发送的消息,对于活跃粉丝使用推模式,对于普通粉丝使用拉模式。
总结:
需求:
①修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
②收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
③查询收件箱数据时,可以实现分页查询
Feed流的滚动分页:
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
接口:
- @PostMapping
- public Result saveBlog(@RequestBody Blog blog) {
- return blogService.saveBlog(blog);
- }
具体实现:
- @Resource
- private IFollowService followService;
- @Override
- public Result saveBlog(Blog blog) {
- //1.获取登录用户
- UserDTO user = UserHolder.getUser();
- blog.setUserId(user.getId());
- //2.保存探店笔记
- boolean isSuccess = save(blog);
- if(!isSuccess){
- return Result.fail("新增笔记失败!");
- }
- //3.查询笔记作者的所有粉丝
- List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
- //4.返回id
- for(Follow follow:follows){
- //4.1 获取粉丝id
- Long userId = follow.getUserId();
- //4.2 推送
- String key="feed:"+userId;
- stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
- }
- return Result.ok(blog.getId());
- }
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
以z1为例复习相应命令:
①倒序按范围查询
②混乱插入(此时插入一条数据,继续查询)
可以发现按角标查询,会出现内容重复
③解决方法:可以按照分数查询
zrevrangebyscore key max min withscores limit offsest count
其中offset中0代表小于等于max的第一条,如果要实现小于max的第一条,则应将offset置1
count代表一次查询多少条(数量)
④存在的问题:如果value不一样,但是分数值一样,可能会查询到重复的数据
将m7的分数值改为6:
此时再按照记忆的最后一条分数去做查询,会发现数据重复:
⑤总结:
滚动分页查询参数:
max:当前时间戳或者上一次查询的最小时间戳
min:0
offset:0 或者 在上一次的结果中,与最小值一样的元素的个数
count:3(与前端约定好)
①定义滚动查询结果的实体类
- @Data
- public class ScrollResult {
- private List<?> list;
- private Long minTime;
- private Integer offset;
- }
②定义接口
- @GetMapping("/of/follow")
- public Result queryBlogOfFollow(@RequestParam("lastId")Long max,@RequestParam(value = "offset",defaultValue = "0")Integer offset){
- return blogService.queryBlogOfFollow(max,offset);
- }
③具体实现
- @Override
- public Result queryBlogOfFollow(Long max, Integer offset) {
- //1.获取当前用户
- Long userId = UserHolder.getUser().getId();
- //2.查询收件箱
- String key=FEED_KEY+userId;
- Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
- //3.非空判断
- if(typedTuples==null || typedTuples.isEmpty()){
- return Result.ok();
- }
- //4.解析数据:blogId,minTime(时间戳),offset
- ArrayList<Object> ids = new ArrayList<>(typedTuples.size());
- long minTime=0;
- int os=1;
- for(ZSetOperations.TypedTuple<String> tuple:typedTuples) {
- //4.1 获取id
- ids.add(Long.valueOf(tuple.getValue()));
- //4.2 获取分数(时间戳)
- long time = tuple.getScore().longValue();
- if (time == minTime) {
- os++;
- } else {
- minTime = time;
- os = 1;
- }
- }
- //5.根据id查询blog
- String idStr = StrUtil.join(",", ids);
- List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
- for (Blog blog : blogs) {
- //5.1 查询blog有关的用户
- queryBlogUser(blog);
- //5.2 查询blog是否被点赞
- isBlogLiked(blog);
- }
- //6.封装并返回
- ScrollResult r = new ScrollResult();
- r.setList(blogs);
- r.setOffset(os);
- r.setMinTime(minTime);
- return Result.ok(r);
- }
案例:练习Redis的GEO功能
需求:
1.添加下面几条数据:
北京南站(116.378248 39.865275)
北京站(116.42803 39.903738)
北京西站(116.322287 39.893729)
查看redis客户端:
==>geo底层采用ZSET实现
2.计算北京西站到北京南站的距离
==>默认单位是米
3.搜索天安门(116.397904 39.909005)附近10km内的所有火车站,并按照距离升序排序
按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
导入数据(通过单元测试):
- @Test
- void loadShopData(){
- //1.查询店铺信息
- List<Shop> list = shopService.list();
- //2.把店铺按照typeId分组,id一致的放到一个集合
- Map<Long,List<Shop>> map=list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
- //3.分批完成写入Redis
- for(Map.Entry<Long,List<Shop>> entry:map.entrySet()){
- //3.1 获取类型id
- Long typeId = entry.getKey();
- String key="shop:geo:"+typeId;
- //3.2 获取同类型的店铺集合
- List<Shop> value = entry.getValue();
- //3.3 写入redis
- List<RedisGeoCommands.GeoLocation<String>> locations=new ArrayList<>(value.size());
- for(Shop shop:value){
- locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));
- }
- stringRedisTemplate.opsForGeo().add(key,locations);
- }
- }
首先,项目中redis相关的依赖版本太低,不支持GEOSEARCH方法,需要更换版本
我们将5.3.7和1.3.9版本排除,观察代码:
然后手动引入新版本:
- <dependency>
- <groupId>org.springframework.data</groupId>
- <artifactId>spring-data-redis</artifactId>
- <version>2.6.2</version>
- </dependency>
- <dependency>
- <groupId>io.lettuce</groupId>
- <artifactId>lettuce-core</artifactId>
- <version>6.1.6.RELEASE</version>
- </dependency>
接口:
- /**
- * 根据商铺类型分页查询商铺信息
- * @param typeId 商铺类型
- * @param current 页码
- * @return 商铺列表
- */
- @GetMapping("/of/type")
- public Result queryShopByType(
- @RequestParam("typeId") Integer typeId,
- @RequestParam(value = "current", defaultValue = "1") Integer current,
- @RequestParam(value = "x",required = false) Double x,
- @RequestParam(value = "y",required = false) Double y
- ) {
- return shopService.queryShopByType(typeId,current,x,y);
- }
具体实现:
- @Override
- public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
- //1.判断是否需要根据坐标查询
- if(x==null || y==null){
- Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, DEFAULT_PAGE_SIZE));
- return Result.ok(page.getRecords());
- }
- //2.计算分页参数
- int from=(current-1)*DEFAULT_PAGE_SIZE;
- int end=current*DEFAULT_PAGE_SIZE;
- //3.查询redis、按照距离排序、分页 结果:shopId、distance
- String key=SHOP_GEO_KEY+typeId;
- GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
- .search(
- key,
- GeoReference.fromCoordinate(x, y),
- new Distance(5000),
- RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
- );
- //4.解析出id
- if(results==null){
- return Result.ok(Collections.emptyList());
- }
- List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
- if(list.size()<=from){
- return Result.ok(Collections.emptyList());
- }
- //4.1 截取from ~ end的那部分
- List<Long> ids=new ArrayList<>(list.size());
- Map<String,Distance> distanceMap=new HashMap<>(list.size());
- list.stream().skip(from).forEach(result->{
- //4.2 获取店铺id
- String shopIdStr = result.getContent().getName();
- ids.add(Long.valueOf(shopIdStr));
- //4.3 获取距离
- Distance distance = result.getDistance();
- distanceMap.put(shopIdStr,distance);
- });
- //5.根据id查询店铺Shop
- String idStr = StrUtil.join(",", ids);
- List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
- for(Shop shop:shops){
- shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
- }
- //6.返回
- return Result.ok(shops);
- }
效果:
注意,在stream流中跳过了一部分商铺,很有可能导致跳过以后没有商铺可查,从而出现问题,所以需要加上如下判断:
- if(list.size()<=from){
- return Result.ok(Collections.emptyList());
- }
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)
Redis中就是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位。
练习:
BITFIELD bm1 GET u2 0含义:读取bm1,以无符号十进制的方式读取两位(u2),从第一位开始读取。返回值3代表读取的比特位是11
BITPOS bm1 0 :代表查找第一个0出现的位置
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了。(Redis的字符串)
接口:
- @PostMapping("/sign")
- public Result sign(){
- return userService.sign();
- }
具体实现:
- @Override
- public Result sign() {
- //1.获取当前登录的用户
- Long userId = UserHolder.getUser().getId();
- //2.获取日期
- LocalDateTime now = LocalDateTime.now();
- //3.拼接key
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key=USER_SIGN_KEY+userId+keySuffix;
- //4.获取今天是本月的第几天
- int dayOfMonth=now.getDayOfMonth();
- //5.写入Redis
- stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
- return Result.ok();
- }
测试:
Q1:什么叫做连续签到天数?
A1:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
Q2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
Q3:如何从后向前遍历每个bit位?
与1做与运算,就能得到最后一个bit位
随后右移1位,下一个bit位就成为了最后一个bit位
案例:实现签到统计功能
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
接口:
- @GetMapping("/sign/count")
- public Result signCount(){
- return userService.signCount();
- }
具体实现:
- @Override
- public Result signCount() {
- //1.获取当前登录用户
- Long userId = UserHolder.getUser().getId();
- //2.获取日期
- LocalDateTime now = LocalDateTime.now();
- //3.拼接key
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key=USER_SIGN_KEY+userId+keySuffix;
- //4.获取今天是本月的第几天
- int dayOfMonth = now.getDayOfMonth();
- //5.获取本月截止今天为止的所有签到记录,返回的是一个十进制的数字
- List<Long> result = stringRedisTemplate.opsForValue().bitField(key,
- BitFieldSubCommands
- .create()
- .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
- .valueAt(0));
- if(result==null || result.isEmpty()){
- //没有任何签到结果
- return Result.ok(0);
- }
- Long num = result.get(0);
- if(num==null || num==0){
- return Result.ok(0);
- }
- //6.循环遍历
- int count=0;
- while(true){
- //6.1 让这个数字与1做与运算,得到数字的最后一个bit位
- if((num&1)==0){
- //如果为0,说明未签到,结束
- break;
- }else{
- //如果不为0,说明已签到,计数器+1
- count++;
- }
- num >>>= 1; // >>>代表无符号右移
- }
- return Result.ok(count);
- }
测试:返回值为3
实际上也确实是3个连续的1:
测试通过!
UV:全称Unique Visitor,也叫独立访问量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断用户是否已经统计过了,要将统计过的用户信息保存,但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。
命令练习:
我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:
- @Test
- void testHyperLogLog(){
- String[] values=new String[1000];
- int j=0;
- for (int i = 0; i < 1000000; i++) {
- j=i%1000;
- values[j]="user_"+i;
- if(j==999){
- //发送到Redis
- stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
- }
- }
- //统计数量
- Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
- System.out.println("count="+count);
- }
运行结果:
内存消耗:
运行前内存:
运行后内存:
1929640-1505552= 424088
424088/1024/1024=0.4M
Redis实战篇完结,恭喜大家~~❀
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。