赞
踩
- public Result sendCode(String phone, HttpSession session) {
- //1.校验验证码
- if(RegexUtils.isPhoneInvalid(phone)){
- //2.错误则直接返回
- return Result.fail("手机号格式错误");
- }
-
- //3. 生成验证码
- String code = RandomUtil.randomNumbers(6);
- //4.保存验证码到redis
- stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
- //5.发送验证码
- log.debug("验证码为:{}", code);
- //6.
- return Result.ok();
- }
验证码登录逻辑:
1.校验提交过来的手机号是否为空
2.获取表单提交过来的手机号和验证码
3.根据key获取redis存的验证码
4.验证码匹配判断
5.不匹配,直接返回提示信息
6.匹配,根据phone查询用户信息
7.判断用户是否存在
8.不存在,创建该用户到数据库中
9.将用户信息存入redis
重点来了
9.1生成随机的token
9.2将User对象转为HashMap存储(用户后续以hash形式在redis中保存)
9.3存入redis中
9.4设置key有效时间
注意:这里设置key的有效时间是死的,就是不管如何,到时间就key就失效,而我们想要的效果是和session一样,每次登录后,key的有效时间就刷新,只有一直不登录时间达到了设置的时间才会失效(这项功能在拦截器里去实现)
10.返回token
- //7.保存用户信息到redis
- //7.1 使用UUID生成token,作为用户登录令牌
- String token = UUID.randomUUID(true).toString(true);
- //7.2 将user转为map
- UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
- Map<String, Object> map = BeanUtil.beanToMap(userDTO, new HashMap<>(),
- CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) ->
- fieldValue.toString()));
- //7.3 存储
- String userToken = LOGIN_USER_KEY + token;
- stringRedisTemplate.opsForHash().putAll(userToken, map);
- //7.4 设置token有效期
- stringRedisTemplate.expire(userToken, LOGIN_USER_TTL, TimeUnit.MINUTES);
- return Result.ok(token);
前端发送的请求需要把登录用户的基本信息封装传递过去,获取当前登录的用户并返回
如何获取登录用户的信息?
使用ThreadLocal。登录时将用户信息存入UserHolder,验证时从中取出再验证
- public class UserHolder {
- private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
- public static void saveUser(UserDTO user){
- tl.set(user);
- }
- public static UserDTO getUser(){
- return tl.get();
- }
- public static void removeUser(){
- tl.remove();
- }
- }
如果直接传递一个user对象,会造成信息泄露,用户的所有信息随着请求提交都可以在负载中看到
创建一个userDto,只定义几个简单属性,只显示一些公开字段,不会泄露密码等敏感信息,返回的时候也是返回userDto对象。
集群模式下session共享存在的问题:由于session不适合集群模式,就是好几个tomcat服务器,运行时采用轮询策略,每个请求处理的服务器不同,不可能每个服务器都存一份session的用户数据,同份数据存多次,既造成了访问延迟数据更新不及时,也造成了内存浪费。而redis是非常适合的,redis支持横向扩展。
使用redis代替session作为缓存后,无法从客户端发送的请求来获取key,在不明确key的情况下,无法获取存入的value值,如下是使用redis缓存来代替session存user
我们使用token来作为唯一key值,token是自动生成且不重复的值(前端做了处理:请求时请求头会带上token信息)
至此我们获取到了对象信息,但是不只一个业务需要获取登录用户的信息,比如发布博客,发表评论,都需要获取用户名称等信息。我们也不可能每个业务都去添加这段代码,要想再方法执行前统一做一些操作,就用到了我们的拦截器,也可以拦截未登录用户。
如果要使用redis作为缓存对象信息,首先拦截器要获取到stringRedisTemplate对象,才能调用缓存api
这里不能使用autowried或resource的原因,这个类的对象是在注入拦截器时,自己new出来的,不是由spring创建的,用构造器注入
而注入拦截器的MvcConfig是由spring构建的,它可以使用自动装配,所以在这里获取stringRedisTemplate,在new的时候作为形参传进
- @Configuration
- public class mvcConfig implements WebMvcConfigurer {
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- //拦截器顺序为RefreshTokenInterceptor->LoginInterceptor
- registry.addInterceptor(new LoginInterceptor())
- .excludePathPatterns(
- "/shop/**",
- "/voucher/**",
- "/shop-type/**",
- "/upload/**",
- "/blog/hot",
- "/user/code",
- "/user/login"
- ).order(1);
- registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
- }
- }
- public class LoginInterceptor implements HandlerInterceptor {
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //通过判断ThreadLocal是否有用户判断需要拦截
- if(UserHolder.getUser() == null){
- response.setStatus(401);
- return false;
- }
- return true;
- }
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
解决登录状态刷新问题
问题原因,之前是在LoginInterceptor中刷新token,但是该拦截器并不是拦截所有路径,只拦截了那些需要用户登录的路径,这样的话就会存在一个问题:如果一直访问哪些不需要登录就能访问的路径,Token就不会刷新。解决:添加一个拦截所有路径的刷新拦截器,并将原有的LoginInterceptor中除了判断用户是否登录之外的逻辑放到该拦截器中
- public class RefreshTokenInterceptor implements HandlerInterceptor {
-
- private StringRedisTemplate stringRedisTemplate;
- public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate = stringRedisTemplate;
- }
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //1.获取token
- String token = request.getHeader("authorization");
- if(token == null){
- return true;
- }
- //2.查询用户
- Map<Object, Object> map = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
- //3.保存用户信息到ThreadLocal
- if(map.isEmpty()){
- return true;
- }
- UserDTO userDTO = BeanUtil.fillBeanWithMap(map, new UserDTO(), false);
- UserHolder.saveUser(userDTO);
- //4.刷新token有效期
- stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
- //5.放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- UserHolder.removeUser();
- }
- }
登录验证总结:以前用的是session,用户登录后将数据放在session中,拦截器那边判断登录验证,可以根据请求传来的cookie获取session-id,从而获取数据
使用redis后,就不能根据请求获取key了,这里用到前端的token来作为key,登录时存入数据。首先是刷新token拦截器拦截,若Redis中有该Token,将用户信息存入ThreadLocal,刷新token有效期,进入后续登录拦截器;否则直接到登录拦截器。
登录拦截器做登录验证时,判断ThreadLocal是否有用户,来判断是否拦截
而存入ThreadLocal中,是因为后续业务需要获取到用户信息
还存在的问题:
目前的做法是刷新拦截器只要拦截到请求,就会对Redis中的LOGIN_USER_KEY+token进行续期并将userDTO存入ThreadLocal中。这样的话设置的过期时间没什么用(因为会一直刷新)
解决方法:设置一个阈值,剩余过期时间少于该阈值时进行刷新操作。
为什么要采用即将过期这种做法?假设设置成剩余过期时间为0时才进行刷新操作,这样的话可能存在刷新不上的问题()
在查询一个商家后,将查询到的信息缓存到redis中,方便后续查询
- public Result queryById(Long id) {
- //1. 从redis查询商铺缓存, 可以用sting,也可以用hash
- String key = CACHE_SHOP_KEY + id;
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //2. 存在,直接返回
- if(StrUtil.isNotBlank(shopJson)){
- Shop shop = JSONUtil.toBean(shopJson, Shop.class);
- return Result.ok(shop);
- }
- //3. 缓存不存在,查询数据库
- Shop shop = getById(id);
- //4. 数据库中不存在,返回错误
- if(shop == null){
- return Result.fail("店铺不存在!");
- }
- //5. 数据库中存在, 写入redis
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
- stringRedisTemplate.expire(key, CACHE_SHOP_TTL, TimeUnit.MINUTES);
- //6. 返回
- return Result.ok(shop);
- }
首页的这块列表信息是不变动的,因此我们可以将它存入缓存中,避免每次访问时都去查询数据库
这里使用 List结构(其他的类型如String和Zset都可以)存储商铺类型,并设为永不过期
- @Service
- public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- @Override
- public Result queryTypeList() {
- //1.查询redis
- String key = CACHE_SHOP_KEY + "TypeList";
- List<String> jsonShopTypeList = stringRedisTemplate.opsForList().range(key, 0, -1);
- //2.存在则直接返回
- if(jsonShopTypeList != null && !jsonShopTypeList.isEmpty()){
- //将string转成shopType类型
- ArrayList<ShopType> shopTypeList = new ArrayList<>();
- for(String json : jsonShopTypeList){
- shopTypeList.add(JSONUtil.toBean(json, ShopType.class));
- }
- return Result.ok(shopTypeList);
- }
- //3.不存在,查询数据库
- List<ShopType> shopTypeList = query().orderByAsc("sort").list();
- //4.数据库中不存在
- if(shopTypeList == null){
- return Result.fail("不存在店铺类型列表");
- }
- //5.加入缓存
- for(ShopType shopType : shopTypeList){
- stringRedisTemplate.opsForList().rightPush(key, JSONUtil.toJsonStr(shopType));
- }
- //5.返回
- return Result.ok(shopTypeList);
- }
- }
同时操作数据库和缓存,需要加入事务,同成功同失败
先更新数据库,再删除缓存,下次查询就把更新的数据存入缓存。因为数据库更新的时间是较长的,而删除缓存,写入缓存是很快速的
在多线程并发情况下,若使用先删除缓存,再更新数据库的策略,这时注意,在该线程更新数据库时,另一线程进行查询操作,又把旧数据写入到缓存了,后续再查询,直接命中缓存,不查询数据库,造成缓存和数据库不一致。
扩展:
Cache Aside Pattern 的缺陷。
缺陷 1:首次请求数据一定不在 cache 的问题
解决办法:可以将热点数据可以提前放入 cache 中。
缺陷 2:写操作比较频繁的话导致 cache 中的数据会被频繁被删除,这样会影响缓存命中率 。
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案:
当数据库查询出的数据为空时,返回错误信息还要把空对象存入缓存中,并设置较短的有效时间。用户再次发送该请求时,直接命中redis缓存的空对象,返回错误信息,不再向下查询数据库,降低数据库压力。
优点:实现简单,维护方便; 缺点:额外的内存消耗,可能造成短期的不一致
优点:内存占用较少,没有多余key; 缺点:实现复杂,存在误判可能
这里选择使用缓存空值实现
- /**
- *下面的函数式编程有点抽象
- *在service层的具体调用为如下:
- *Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
- */
- public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallBack, Long time, TimeUnit timeUnit){
- //1. 从redis查询商铺缓存, 可以用sting,也可以用hash
- String key = keyPrefix + id;
- String strJson = stringRedisTemplate.opsForValue().get(key);
- //2. 存在,判断是否存在
- if(StrUtil.isNotBlank(strJson)){
- R r = JSONUtil.toBean(strJson, type);
- return r;
- }
- //此时还剩两种情况,null和""
- //判断redis查询到的是否为空值
- if(strJson != null){
- return null;
- }
- //3. 不存在,查询数据库
- R r = dbFallBack.apply(id);
- if(r == null){//4. 数据库中不存在,redis缓存空值
- stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
- return null;
- }
- //5. 数据库中存在, 写入redis
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), time, timeUnit);
- //6. 返回
- return r;
- }
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案:互斥锁和逻辑过期
两种方案的侧重点不同
利用Redis中的setnx(setIfAbsent)来实现互斥锁
- private boolean tryLock(String key){
- Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
- return BooleanUtil.isTrue(absent);
- }
- private void unLock(String key){
- stringRedisTemplate.delete(key);
- }
- public Shop queryWithMutex(Long id){
- //1. 从redis查询商铺缓存, 可以用sting,也可以用hash
- String key = CACHE_SHOP_KEY + id;
- String shopJson = stringRedisTemplate.opsForValue().get(key);
- //2. 存在,判断是否为空
- if(StrUtil.isNotBlank(shopJson)){
- //3.命中,直接返回
- Shop shop = JSONUtil.toBean(shopJson, Shop.class);
- return shop;
- }
- //此时还剩两种情况,null和""
- //判断redis查询到的是否为空值
- if(shopJson != null){
- return null;
- }
- //4. 不存在,尝试获取互斥锁
- String lockKey = LOCK_SHOP_KEY + id;
- boolean isLock = tryLock(lockKey);
- Shop shop;
- try {
- if(!isLock){
- //4.1 获取锁失败,休眠一段时间,重新查询
- Thread.sleep(50);
- return queryWithMutex(id);
- }
- //4.2 获取锁成功
- shop = getById(id);
- if(shop == null){//5. 数据库中不存在,redis缓存空值
- stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
- return null;
- }
- //6. 数据库中存在, 写入redis
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
- }catch (InterruptedException e){
- throw new RuntimeException(e);
- }finally {
- //7. 释放锁
- unLock(lockKey);
- }
- //8. 返回
- return shop;
- }
逻辑过期设置的一般不是TTL,设置缓存基本上是一直有效到活动结束后,才移除缓存中数据
以key和JSONUtil.toJsonStr(redisData)作为键值对,在redisData中设置逻辑过期,查询缓存时判断
之所以会逻辑过期,不是因为有效时间,而是因为数据更新了,缓存也需要更新数据,这是逻辑过期。
整理流程:
- public class RedisData {
- private LocalDateTime expireTime;
- private Object data;
- }
- public void setWithLogicExpire(String key, Object data, Long expireTime, TimeUnit timeUnit){
- //设置逻辑过期时间
- RedisData redisData = new RedisData();
- redisData.setData(data);
- redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(expireTime)));
- //写入redis
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
- }
- public <R, ID> R queryWithLogicExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallBack, String lockKeyPrefix, Long time, TimeUnit timeUnit){
- //1. 从redis查询商铺缓存, 可以用sting,也可以用hash
- String key = keyPrefix + id;
- String strJson = stringRedisTemplate.opsForValue().get(key);
- //2. 存在,判断是否存在
- //不存在,返回空
- if(StrUtil.isBlank(strJson)) return null;
- //3.存在,取出数据
- RedisData redisData = JSONUtil.toBean(strJson, RedisData.class);
- JSONObject data = (JSONObject) redisData.getData();
- R r = JSONUtil.toBean(data, type);
- LocalDateTime expireTime = redisData.getExpireTime();
- //4 判断是否逻辑过期
- //4.1 未过期
- if(expireTime.isAfter(LocalDateTime.now())){
- return r;
- }
- //过期
- //5.尝试获取锁
- String lockKey = lockKeyPrefix + id;
- boolean isLock = tryLock(lockKey);
- //5.1 获取锁失败
- if(!isLock) return r;
- //5.2 获取锁成功
- //5.3 二次检查是否过期
- if(expireTime.isAfter(LocalDateTime.now())){
- return r;
- }
- //6.开启独立线程, 重建缓存
- CACHE_REBUILD_EXECUTOR.submit(() ->{
- try {
- //7.重建缓存
- this.setWithLogicExpire(key, dbFallBack.apply(id), time, timeUnit);
- }catch (Exception e){
- throw new RuntimeException(e);
- }finally {
- //释放锁
- unLock(lockKey);
- }
- });
- //返回过期的缓存
- return r;
- }
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:给不同的Key的TTL添加随机值、利用Redis集群提高服务的可用性、给缓存业务添加降级限流策略、给业务添加多级缓存
将缓存穿透、缓存击穿、缓存雪崩的各种解决方法封装成CacheClient类
全局id生成器,是一种在分布式系统下用来生成全局唯一ID的工具,满足以下特性:高可用、唯一、高性能、递增性、安全性
为了增加ID的安全性,不直接使用Redis自增的数值,而是拼接一些其它信息:
通过 Redis 的 incr
自增原子命令即可实现对 id 原子顺序递增
- @Component
- public class RedisIdWorker {
- //开始时间戳
- private static final long BEGIN_TIMESTAMP = 1674086400L;
-
- //序列号位数
- private static final int COUNT_BITS = 32;
-
- private StringRedisTemplate stringRedisTemplate;
-
- public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
- this.stringRedisTemplate = stringRedisTemplate;
- }
-
- public long nextId(String ketPrefix){
- //时间戳
- LocalDateTime now = LocalDateTime.now();
- long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
- long timeStamp = nowSecond - BEGIN_TIMESTAMP;
- //序列号
- String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
- Long count = stringRedisTemplate.opsForValue().increment("icr:" + ketPrefix + ":" + date);
- //
- return timeStamp << COUNT_BITS | count;
- }
- }
Redis集群模式下的分布式id一种做法:分成3部分组成:毫秒级时间,redis集群的第多少个节点,每一个redis节点在每一毫秒的自增序列值
下单包括两个部分:修改秒杀优惠券表tb_seckill_voucher、在tb_voucher_order表中添加订单
涉及到多表操作需要添加事务
原因分析:
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如Synchronized、Lock都属于悲观锁
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。如果没有修改则认为是安全的,自己才更新数据。如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
悲观锁实现比较简单,操作前获取锁,操作结束才释放锁,让多个线程串行执行,但是你让并发线程串行,效率十分低下。
乐观锁设计
版本号法
CAS法
在该情况下采用CAS法在修改库存时判断库存是否被修改过即可。
- boolean success = seckillVoucherService.update()
- .setSql("stock = stock - 1")
- .eq("voucher_id",voucherId).eq("stock",voucher.getStock())
- .update();
存在的问题:在压力测试下(200个线程1s内执行完)只卖出了20张
原因分析:设置的是stock没有被修改,且每个线程只会尝试一次。假设此时有10个线程同时操作,只有一个会成功。只需要让更改时的条件为查到的库存数大于0即可
- boolean success = seckillVoucherService.
- update().setSql("stock = stock - 1")
- .eq("voucher_id", voucherId)
- .gt("stock", 0)
- .update();
如上,一条语句执行查询和减库存操作,使得多个请求到达数据库时,数据库直接将多个更新语句通过互斥锁串行化执行,保证不会出现超卖。
但是将互斥操作下沉到数据库存在问题:大量的写请求到达数据库,然后串行执行,这样操作的性能差,对数据库的压力也大。在并发数量比较少的情况下,还可以接受,但是如果是高并发的场景,上述方法不可取。要正确解决高并发下的超卖问题,一般需要借助缓存,比如redis,见后续的异步秒杀优化
乐观锁存在的问题:
如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 "ABA"问题。
ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。
首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。可以使用锁或者利用AtomicReference
类把多个共享变量合并成一个共享变量来操作。
加锁方式,在并发请求时,保证一个用户只能下一单。通过对用户加锁,保证在多线程情况下,多个用户相同时只有一个用户能够操作成功
尤其注意四点:
- synchronized (userId.toString().intern()){
- return createVoucherOrder(voucherId);
- //相当于return this.createVoucherOrder(voucherId);
- }
获得代理对象,对代理对象加锁。整体代码如下(需要在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true))
- public Result seckillVoucher(Long voucherId) {
- //1.查询id为voucherId的秒杀券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- if(voucher == null){
- //秒杀券不存在
- return Result.fail("该秒杀券不存在!");
- }
- //2.查看秒杀是否开始或结束
- LocalDateTime beginTime = voucher.getBeginTime();
- //2.1 秒杀未开始,返回
- if(beginTime.isAfter(LocalDateTime.now())){
- return Result.fail("该秒杀未开始!");
- }
- //2.2 秒杀已结束,返回
- LocalDateTime endTime = voucher.getEndTime();
- if(endTime.isBefore(LocalDateTime.now())){
- return Result.fail("该秒杀已结束!");
- }
- //3.库存是否充足
- Integer stock = voucher.getStock();
- //库存不足,返回
- if(stock < 1) return Result.fail("该秒杀券已秒杀完!");
- Long userId = UserHolder.getUser().getId();
- //说明:
- //1. 因为要保证的是每个用户只能秒杀成功一次,对整个seckillVoucher加锁的话影响效率
- //2. 选择对用户加锁,这里需要注意UserId为包装类,toString之后也是new的一个String对象,因此需要intern方法
- //3. 对用户加锁的话有两个选择,一种是如下,另一种是在createVoucherOrder中加锁
- //4. 第二种方式的问题:过程是先释放锁再提交事务,这样的话在这个过程中可能会发生其他线程拿到锁但是事务还没提交的情况,存在线程安全问题
- synchronized (userId.toString().intern()){
- //这里需要注意seckillVoucher方法是没有加@Transactional注解的
- //调用带@Transactional注解的createVoucherOrder方法
- //发生了自身调用,没有经过Spring的代理类,事务不会生效
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);
- }
- }
-
- @Transactional
- public Result createVoucherOrder(Long voucherId) {
- //4.检查该用户是否已经秒杀成功过
- //用户id
- Long userId = UserHolder.getUser().getId();
- //查询订单
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if(count > 0) return Result.fail("您已秒杀过该优惠券!");
-
- //5.1 修改库存
- boolean success = seckillVoucherService.
- update().setSql("stock = stock - 1")
- .eq("voucher_id", voucherId)
- .gt("stock", 0)
- .update();
- if(!success) return Result.fail("秒杀失败!");
- //5.2 生成秒杀订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //订单id
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- voucherOrder.setUserId(userId);
- //优惠券id
- voucherOrder.setVoucherId(voucherId);
- save(voucherOrder);
-
- return Result.ok(orderId);
- }
上述可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
简单模拟集群:
在集群模式下,非分布式加锁只是该台jvm给当前这台服务器处理的请求加锁,而集群是多台服务器轮询处理请求,每台服务器都有一个加锁的线程,每台服务器都可能会创建一个新订单。
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的特点:多进程可见、互斥、高可用、高性能(高并发)、安全性
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的分布式锁有以下三种
注:阻塞式和非阻塞式均可,这里采用的是阻塞式。为了保证set和expire命令的原子性,使用set key value nx ex命令
Redis分布式锁原理:基于setnx命令–>key存在的情况下,不更新value,而是返回nil。那么利用key是唯一的特性来加锁,比如一人一单业务,key名称精确到userId,那么同一个用户无论发多少次请求,能成功创建键值的只有一个,因为setnx命令,后面的请求在获取锁创建键值就会失败
在多线程情况下,当前锁设计无法区分不同的线程。
解决思路:在释放锁之前判断锁是不是自己的
通过对线程添加线程标识来区分不同的线程。需要注意,因为考虑的是集群的情况,不同的jvm可能存在线程号相同的线程,不能直接用线程ID做标识,在线程ID前加上一个UUID
初级版本的锁实现如下
- public class SimpleRedisLock implements ILock{
- private static final String KEY_PREFIX = "lock:";
- private String name;
- private StringRedisTemplate stringRedisTemplate;
- private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
-
- public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
- this.name = name;
- this.stringRedisTemplate = stringRedisTemplate;
- }
-
- @Override
- public boolean tryLock(long timeOutSec) {
- //线程id
- String threadId = ID_PREFIX + Thread.currentThread().getId();
- Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeOutSec, TimeUnit.SECONDS);
- //这里包装类拆箱的时候可能会出现空指针异常
- return Boolean.TRUE.equals(success);
- }
-
- @Override
- public void unlock() {
- String threadId = ID_PREFIX + Thread.currentThread().getId();
- if(threadId.equals(stringRedisTemplate.opsForValue().get(KEY_PREFIX + name))){ //防止误删
- stringRedisTemplate.delete(KEY_PREFIX + name);
- }
- }
- }
需要保证:判断锁的操作和释放锁的操作得成一个原子性操作,一起执行,要阻塞都阻塞,要通过都通过
通过Lua脚本保证多条命令的原子性。unlock.lua:
为什么lua脚本可以保证原子性?
Redis保证以原子方式执行脚本:执行脚本时不会执行其他脚本或Redis命令。 类似于给执行lua脚本这段代码加了锁 ,可能redis内部实现会有一定的差异,反正大致意思就是这样
Redis使用同一个Lua解释器来执行所有命令,同时,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。
- -- 这里的 KEYS[1] 就是锁的 key,这里的 ARGV[1] 就是当前线程标识
- -- 获取锁中的线程标识 get key
- local id = redis.call('get', KEYS[1]);
- -- 比较线程标识与锁中的标识是否一致
- if (id == ARGV[1]) then
- -- 释放锁 del key
- return redis.call('del', KEYS[1])
- end
- return 0
使用lua脚本改造后的SimpleRedisLock:
- private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
- static {
- UNLOCK_SCRIPT = new DefaultRedisScript<>();
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
- @Override
- public void unlock() {
- //通过调用lua脚本实现判断和删除操作的原子性
- stringRedisTemplate.execute(UNLOCK_SCRIPT,
- Collections.singletonList(KEY_PREFIX + name),
- ID_PREFIX + Thread.currentThread().getId());
- }
基于 setnx 实现的分布式锁存在下面的问题
1.不可重入:同一个线程无法多次获取同一把锁(第二次尝试拿锁时会返回nil,获取锁失败)
2.不可重试:获取锁只尝试一次就返回 false,没有重试机制
3.超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
4.主从一致性:如果 Redis 提供了主从集群,主从延同步在延迟,当主机宕机时,如果从机同步主机中的数据,则会出现锁失效
导入maven坐标
redisson配置文件
- @Configuration
- public class RedissonConfig {
- @Bean
- public RedissonClient redissonClient(){
- Config config = new Config();
- config.useSingleServer().setAddress("").setPassword("");
- return Redisson.create(config);
- }
- }
Redisson可重入锁原理
基本思想就是先判断锁是否是该线程的,每次获取锁将次数加一,释放锁减一,减到0时删除锁。
Redisson分布式锁原理
Redisson分布式锁原理:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间( releaseTime/ 3),重置超时时间。看起来和永不过期没区别,但是锁续期任务线程是和获取锁线程在同一个jvm实例中的,宕机会一起挂掉
总结:
①不可重入Redis 分布式锁
原理:利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效
②可重入的 Redis 分布式锁
原理:利用 hash 结构,记录线程标示和重入次数;利用 watchDog 延续锁时间;利用信号量控制锁重试等待
缺陷:Redis 宕机引起锁失效问题
③Redisson 的 multiLock连锁
原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
观察业务流程可以发现,下单的业务流程包含一系列判断和数据库操作,是比较耗费时间的。
异步优化:分离成两个线程,一个线程判断用户的购买资格,用户有购买资格后再开启一个独立的线程来处理耗时较久的减库存、写订单的操作。将耗时较短的两步操作放到 Redis 中,在 Redis 中处理对应的秒杀资格的判断。Redis 的性能是比 MySQL 要好的。此外,还需要引入异步队列记录相关的信息。
需求分解:
1.新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中(将在数据库中判断库存转移到Redis中)
2.基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
3.如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
需求实现:
1.略
2.基于Lua脚本进行预检
- local voucherId = ARGV[1]
- local userId = ARGV[2]
- local stockKey = 'seckill:stock:' .. voucherId
- local orderKey = 'seckill:order:' .. voucherId
- ---1.判断库存是否充足
- if(tonumber(redis.call('get', stockKey)) <= 0) then
- return 1
- end
- ---2.判断用户是否下过单
- if(redis.call('sismember', orderKey, userId) == 1) then
- return 2
- end
- ---3.扣减库存
- redis.call('incrby', stockKey, -1)
- ---4.将UserId存入当前优惠券的set集合
- redis.call('sadd', orderKey, userId)
- return 0
注入脚本
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
3、封装订单,放入阻塞队列
创建阻塞队列
- //创建阻塞队列 这个阻塞队列特点:当一个线程尝试从队列获取元素的时候,如果没有元素该线程阻塞,直到队列中有元素才会被唤醒获取
- private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
预检通过后,保存订单信息,放入阻塞队列
- @Override
- public Result seckillVoucher(Long voucherId) {
- Long userId = UserHolder.getUser().getId();
- //1.执行sekill.lua脚本,进行预检:判断库存是否充足,用户是否下过单
- Long execute = stringRedisTemplate.execute(SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString());
- int result = execute.intValue();
- if(result == 1){
- return Result.fail("该秒杀券已秒杀完!");
- } else if (result == 2) {
- return Result.fail("不允许重复下单!");
- }
- //2.将优惠券id、用户id和订单id存入阻塞队列
- //2.1.生成秒杀订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //订单id
- long voucherOrderId = redisIdWorker.nextId("order");
- voucherOrder.setId(voucherOrderId);
- voucherOrder.setUserId(userId);
- //优惠券id
- voucherOrder.setVoucherId(voucherId);
- //2.2 将订单放入阻塞队列
- orderTasks.add(voucherOrder);
- //2.3 获取动态代理对象
- proxy = (IVoucherOrderService) AopContext.currentProxy();
-
- return Result.ok(voucherOrderId);
- }
4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单
创建线程池
private final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
创建线程任务。需要注意,线程任务需要在用户秒杀订单之前开始,用户一但开始秒杀,队列就会有新的订单,线程任务就应该立即取出订单信息,这里利用spring提供的注解,在类初始化完毕后立即执行线程任务,
- @PostConstruct
- private void init(){
- //因为程序一启动就可能有用户创建订单,因此需要在该类创建后就要初始化VoucherOrderHandler以执行线程任务
- SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
- }
-
- private class VoucherOrderHandler implements Runnable{
- @Override
- public void run() {
- while (true){
- //只有阻塞队列中存在元素才能take到,否则会一直阻塞
- try {
- VoucherOrder voucherOrder = orderTasks.take();
- //执行业务
- handleVoucherOrder(voucherOrder);
- } catch (Exception e) {
- log.info("异常信息", e);
- }
- }
- }
- }
异步下单逻辑,这里的获取锁操作只是做最后的兜底,以防万一,因为前面lua脚本都已经判断过了。
这里需要注意的是,AopContext.currentProxy()是通过ThreadLocal来获取当前代理对象的,而这里是异步下单,异步线程中是获取不到当前代理对象的。因此要在主线程中获取代理对象,1.方法多传入一个参数 2.将代理对象作为类的成员
- private IVoucherOrderService proxy;
- private void handleVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- //理论上说在lua脚本中已经保证了并发安全,这里再做一次兜底
- RLock lock = redissonClient.getLock("lock:order:" + userId);
- boolean isLock = lock.tryLock();
- if(!isLock){
- log.info("不允许重复下单");
- }
- //需要注意:观察AopContext.currentProxy()可以发现是通过ThreadLocal来获取当前代理对象的
- //而这里是异步下单,当前线程中是获取不到当前代理对象的
- //两种解决方案:在主线程中获取代理对象,1.方法多传入一个参数 2.将代理对象作为类的成员
- try{
- proxy.createVoucherOrder(voucherOrder);
- }finally {
- lock.unlock();
- }
- }
- @Transactional
- public void createVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- //查询订单
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
- if(count > 0) {
- log.error("您已秒杀过该优惠券!");
- return;
- }
-
- //2.修改库存
- boolean success = seckillVoucherService.
- update().setSql("stock = stock - 1")
- .eq("voucher_id", voucherOrder.getVoucherId())
- .gt("stock", 0)
- .update();
- if(!success) {
- log.error("库存不足!");
- return;
- }
- //3.创建订单
- save(voucherOrder);
- }
整个优化思路
①编写lua脚本,解决超卖问题和一人一单,超卖用CAS方法判断库存是否大于0,一人一单用redis的set集合的sismenber判读该优惠券(key)下的用户id(value)是否唯一
②Java代码中注入脚本
③若脚本结果为0,代表有购买优惠券资格,将new VoucherOrder创建订单对象,把订单对象放入阻塞队列中,返回订单id给用户。至此主线程任务完毕。
异步下单:
④创建线程池,并定义线程任务,但注意,线程任务必须在方法执行前执行,使用到spring提供的注解在类初始化完成后执行线程任务
⑤线程任务中获取阻塞队列的订单对象,然后调用handleVoucherOrder方法传入voucherOrder
⑥handleVoucherOrder方法其实是再次获取锁,这个就是个纯兜底,作用不大。并在获取锁成功后调用createVoucherOrder方法扣减库存创建订单,由于都是对数据库的操作,因此要提交事务
至此,整个秒杀业务优化完毕
总结
秒杀业务的优化思路是什么?
先利用 Redis 完成库存余量、一人一单判断,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题:我们使用的是JDK里的阻塞队列,是基于JVM的内存,高并发海量请求下造成内存溢出
数据安全问题:服务宕机等情况下内存数据丢失
目前还存在的问题
队列满了怎么办 ?
子线程下单失败怎么办?
订单太多了超过阻塞队列大小了怎么办?
拒绝策略怎么设计?
待消费的消息是否应该持久化,不然宕机了消息不就丢失了?
还有如何确保消息确实被消费成功了,不然消费失败了无法重试
前面的阻塞队列是基于JVM的内存实现,那么不可避免的两个大问题,①高并发海量访问,创建订单,队列很快就超出上限造成内存溢出;②JVM内存没有持久化机制,若服务出现重启或宕机,阻塞队列中的所有任务都会丢失。
MQ是JVM以外的服务,不受JVM内存限制,且MQ中的所有消息会做持久化,这样即使重启或宕机,数据不会丢失。消息投递给消费者后需要消费者确认,未确认消息会一直存在下一次继续投递,确保消息至少被消费一次
4.5.6.1 基于List结构模拟消息队列
如何满足消息保序需求
List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
List 可以使用 LPUSH + RPOP (或者反过来,RPUSH+LPOP)命令实现消息队列。
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP
命令(比如使用一个while(1)循环)。如果有新消息写入,RPOP命令就会返回结果,否则,RPOP命令返回空值,再继续循环。
所以,即使没有新消息写入List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。
为了解决这个问题,Redis提供了 BRPOP 命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。
如何处理重复的消息
消费者要实现重复消息的判断,需要 2 个方面的要求:
但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。
如何保证消息可靠性
List 类型提供了 BRPOPLPUSH
命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。
这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
总结:
优点:利用 Redis 存储,不受限于 JVM 内存上限;基于 Redis 的持久化机制,数据安全性有保证;可以满足消息有序性
缺点:无法避免消息丢失;只支持单消费者
4.5.6.2 基于PubSub的消息队列
消费者可以订阅一个或多个channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern] :订阅与 pattern 格式匹配的所有频道
优点:采用发布订阅模型,支持多生产、多消费
缺点:不支持数据持久化;无法避免消息丢失;消息堆积有上限,超出时数据丢失
4.5.6.3 基于Stream的消息队列
单消费模式
发送消息的命令
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]
key:队列名称
[NOMKSTREAM]:如果队列不存在时,确定是否自动创建队列,默认自动创建
[MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量
|ID:消息的唯一 ID, 代表由 Redis 自动生成,格式是 ”时间戳-递增数字“,例如:”1666161469358-0“
field value [field value …]:发送到队列中的消息,称为 Entry。格式为多个 Key-Value 键值对。
例如:创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用 Redis 自动生成 ID
XADD users * name jack age 21 “1644805700523-0”
读取消息命令
[COUNT count]:每次读取消息的最大数量;
[BLOCK milliseconds]:当没有消息时,确定是否阻塞,阻塞则添加具体的 milliseconds (阻塞时长)
STREAMS key [key …]:从哪个队列读取消息,Key 就是队列名;
ID [ID …]:起始 ID,只返回大于该 ID 的消息;0 代表从第一个消息开始,$ 代表从最新的消息开始。
例如,使用 XREAD 读取第一个消息 XREAD COUNT 1 STREAMS users 0
当我们指定起始 ID 为 $ 时,代表读取最新的消息
如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条
如此便会出现漏读消息的问题
STREAM 类型消息队列的 XREAD 命令特点:
1.消息可回溯(消息永久的保存在消息队列中)
2.一个消息可以被多个消费者读取
3.可以阻塞读取
4.有消息漏读的风险(缺点)
消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
其具备下列特点:
消息分流:队列中的消息会分流给组内不同的消费者,而不是重复消费,从而加快消息处理的速度。
消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,即使消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费。(解决漏读问题)
消息确认:消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list。当处理完成后需要通过 XACK 命令来确认消息,标记消息为已处理,才会从 pending-list 中移除。(解决消息丢失问题)
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始 ID 标示,$ 代表队列中最后一个消息,0 则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动 ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始 ID:
“>”:从下一个未消费的消息开始
其它:根据指定 id 从 pending-list 中获取已消费但未确认的消息。
例如 0,是从 pending-list 中的第一个消息开始
4.5.6.3 Stream的消息队列异步秒杀下单
需求:
①创建一个 Stream 类型的消息队列,名为 stream.orders
②修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId
③项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单
- local voucherId = ARGV[1]
- local userId = ARGV[2]
- local orderId = ARGV[3]
- local stockKey = 'seckill:stock:' .. voucherId
- local orderKey = 'seckill:order:' .. voucherId
- ---1.判断库存是否充足
- if(tonumber(redis.call('get', stockKey)) <= 0) then
- return 1
- end
- ---2.判断用户是否下过单
- if(redis.call('sismember', orderKey, userId) == 1) then
- return 2
- end
- ---3.扣减库存
- redis.call('incrby', stockKey, -1)
- ---4.将UserId存入当前优惠券的set集合
- redis.call('sadd', orderKey, userId)
- ---5.向消息队列中发送消息: XADD stream.orders * k1 v1 k2 v2 ...
- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
- return 0
- private class VoucherOrderHandler implements Runnable{
- String messageQueue = "stream.orders";
- @Override
- public void run() {
- while (true){
- try {
- //1.从消息队列中取出消息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders
- List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
- StreamOffset.create(messageQueue, ReadOffset.lastConsumed()));
- //2.判断是否获取成功
- if(records == null || records.isEmpty()) continue;
- //3.解析消息中的订单信息
- MapRecord<String, Object, Object> record = records.get(0);
- Map<Object, Object> values = record.getValue();
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
- //4.执行下单
- handleVoucherOrder(voucherOrder);
- //5.确认消息
- stringRedisTemplate.opsForStream().acknowledge(messageQueue, "g1", record.getId());
- } catch (Exception e) {
- handlePendingList();
- log.info("处理订单异常", e);
- }
- }
- }
-
- private void handlePendingList() {
- while (true){
- try {
- //1.pending-list取出消息
- List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1),
- StreamOffset.create(messageQueue, ReadOffset.from("0")));
- //2.判断是否获取成功
- if(records == null || records.isEmpty()) {
- //获取失败说明没有异常消息
- break;
- }
- //3.解析消息中的订单信息
- MapRecord<String, Object, Object> record = records.get(0);
- Map<Object, Object> values = record.getValue();
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
- //4.执行下单
- handleVoucherOrder(voucherOrder);
- //5.确认消息
- stringRedisTemplate.opsForStream().acknowledge(messageQueue, "g1", record.getId());
- } catch (Exception e) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException ex) {
- throw new RuntimeException(ex);
- }
- log.info("处理pending-list订单异常", e);
- //因为while(true)在try-catch外面,不需要递归调用
- }
- }
- }
- }
涉及基于List的点赞列表
基于SortedSet的点赞排行榜
- //controll层
- @PostMapping
- public Result saveBlog(@RequestBody Blog blog) {
- return blogService.saveBlog(blog);
- }
-
- //service层
- @Override
- public Result saveBlog(Blog blog) {
- // 获取登录用户
- UserDTO user = UserHolder.getUser();
- blog.setUserId(user.getId());
- // 保存探店博文
- save(blog);
- // 返回id
- return Result.ok(blog.getId());
- }
查看博客时,除了需要查询blog表外,还需要显示发布博客的用户名称和头像
有两种做法,一种是使用Dto对象,把两表查询结果放入dto,然后返回dto对象;第二种是直接在Blog实体类,添加用户头像和用户名称两个属性,并加上mp提供的注解@TableField(exist = false) 表示该属性不属于表中字段
- private void queryBlogUser(Blog blog) {
- Long userId = blog.getUserId();
- User user = userService.getById(userId);
- blog.setIcon(user.getIcon());
- blog.setName(user.getNickName());
- }
- @Override
- public Result queryBlogById(Long id) {
- //1.查询blog
- Blog blog = getById(id);
- if(blog == null){
- return Result.fail("该blog不存在!");
- }
- //2.查询用户
- queryBlogUser(blog);
- //3.查询是否点赞
- UserDTO user = UserHolder.getUser();
- if(user != null){
- //当用户不为空时
- isBlogLike(blog);
- }
- return Result.ok(blog);
- }
需求分析:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤:
给Blog类中添加一个isLike字段,标示是否被当前用户点赞
修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
使用redis的set集合,key为blog的id,value为user的id,用set的ismembet方法判断,当前集合是否有userId,来判读该博客,用户是否已经点赞过了。每个key代表每条博客,每个key下的value集合代表所有点赞的用户id集合。
点赞功能
- public Result likeBlog(Long id) {
- //对blog点赞
- /**
- * 要实现一篇blog一个人只能点赞一次,就需要知道当前Blog被哪些用户点赞过,而且登录用户需要知道自己是否点赞过
- * 最直接的做法是在数据库中维护一张表,记录每个用户点赞的blog,性能较差
- * 采用:在redis中以blogId为key,set为值,set中存放userId
- */
- //1.获取当前用户
- UserDTO user = UserHolder.getUser();
- if(user==null) return Result.fail("请先登录!");
- Long userId = user.getId();
- //2.判断是否点赞过
- String key = BLOG_LIKED_KEY + id;
- Boolean isLike = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- //3.1 点赞过
- if(BooleanUtil.isTrue(isLike)){
- //取消点赞并在set中移除该用户
- boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
- if(isSuccess){
- stringRedisTemplate.opsForSet().remove(key, userId.toString());
- }
- }else{
- //点赞并在set中添加该用户
- boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
- if(isSuccess){
- stringRedisTemplate.opsForSet().add(key, userId.toString());
- }
- }
- return Result.ok();
- }
修改首页博客和根据id查询博客
- @Override
- public Result queryBlogById(Long id) {
- //1.查询blog
- Blog blog = getById(id);
- if(blog == null){
- return Result.fail("该blog不存在!");
- }
- //2.查询用户
- queryBlogUser(blog);
- //3.查询是否点赞
- UserDTO user = UserHolder.getUser();
- if(user != null){
- //当用户不为空时
- isBlogLike(blog);
- }
- return Result.ok(blog);
- }
-
- @Override
- public Result queryHotBlog(Integer current) {
- // 根据用户查询
- Page<Blog> page = query()
- .orderByDesc("liked")
- .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
- // 获取当前页数据
- List<Blog> records = page.getRecords();
- // 查询用户
- records.forEach(blog -> {
- this.queryBlogUser(blog);
- this.isBlogLike(blog);
- });
- return Result.ok(records);
- }
抽象出的共同方法
- private void queryBlogUser(Blog blog) {
- Long userId = blog.getUserId();
- User user = userService.getById(userId);
- blog.setIcon(user.getIcon());
- blog.setName(user.getNickName());
- }
-
- private void isBlogLike(Blog blog) {
- UserDTO user = UserHolder.getUser();
- if(user != null){
- //当用户不为空时
- Long userId = user.getId();
- String key = BLOG_LIKED_KEY + blog.getId();
- Boolean isLike = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- blog.setIsLike(BooleanUtil.isTrue(isLike));
- }
使用sortedSet来实现排行榜功能
1. 使用sortedSet改造之前的基于set实现的点赞功能(sortedSet没有Ismember这种方法,通过判断取出的键的分数是否为空来判断)
2. 这里需要注意,因为mysql自己会做优化,类似于where id in(5,1) 会被优化为(1,5),所以要用 where id in(5,1) order by field(id, 5, 1)
- @Override
- public Result queryBlogLikes(Long id) {
- String key = BLOG_LIKED_KEY + id;
- //1. 查询top5的点赞用户
- Set<String> top5ByTime = stringRedisTemplate.opsForZSet().range(key, 0, 4);
- //2. 取出其中的用户id
- if(top5ByTime == null || top5ByTime.isEmpty()){
- return Result.ok(Collections.emptyList());
- }
- List<Long> ids = top5ByTime.stream().map(Long::valueOf).collect(Collectors.toList());
- //3. 根据用户id查询
- String idStr = StrUtil.join(",", ids);
- List<UserDTO> userDTOS = userService.query()
- .in("id", ids)
- .last("order by FIELD(id," + idStr + ")")
- .list()
- .stream()
- .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
- .collect(Collectors.toList());
- //4.返回
- return Result.ok(userDTOS);
- }
基于Set集合的关注,取关,共同关注,消息推送等功能
使用tb_follow表保存关注信息
逻辑很简单,就是表中插入一条数据和删除一条数据
- // UserController 根据id查询用户
-
- @GetMapping("/{id}")
- public Result queryUserById(@PathVariable("id") Long userId){
- // 查询详情
- User user = userService.getById(userId);
- if (user == null) {
- return Result.ok();
- }
- UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
- // 返回
- return Result.ok(userDTO);
- }
-
- // BlogController
- @GetMapping("/of/user")
- public Result queryBlogByUserId(
- @RequestParam(value = "current", defaultValue = "1") Integer current,
- @RequestParam("id") Long id) {
- // 根据用户查询
- Page<Blog> page = blogService.query()
- .eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
- // 获取当前页数据
- List<Blog> records = page.getRecords();
- return Result.ok(records);
- }
在保存关注信息时,在redis中使用set存储用户的所有关注
利用set的交集实现共同关注
- @Override
- @Transactional
- public Result followById(Long followUserId, Boolean isFollow) {
- UserDTO user = UserHolder.getUser();
- Long userId = user.getId();
- String key = FOLLOW_USER_ID + userId;
- if(isFollow){
- //关注用户
- Follow follow = new Follow();
- follow.setUserId(userId);
- follow.setFollowUserId(followUserId);
- boolean isSuccess = save(follow);
- if(isSuccess){
- //将关注信息保存到Redis中
- stringRedisTemplate.opsForSet().add(key, followUserId.toString());
- }
- }else{
- //取关
- LambdaQueryWrapper<Follow> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId);
- boolean isSuccess = remove(wrapper);
- if(isSuccess){
- stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
- }
- }
- return Result.ok();
- }
-
- @Override
- public Result isFollow(Long followUserId) {
- UserDTO user = UserHolder.getUser();
- Long userId = user.getId();
- LambdaQueryWrapper<Follow> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId);
- int count = count(wrapper);
- return Result.ok(count > 0);
- }
-
- @Override
- public Result common(Long userId) {
- Long currentUserId = UserHolder.getUser().getId();
- String key1 = FOLLOW_USER_ID + userId;
- String key2 = FOLLOW_USER_ID + currentUserId;
- //获取两者的交集
- Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
- if(intersect == null || intersect.isEmpty()){
- return Result.ok(Collections.emptyList());
- }
- //获取ids
- List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
- //根据ids获取userDto
- List<UserDTO> userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
- return Result.ok(userDTOS);
- }
关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供 “沉浸式” 的体验,通过无限下拉刷新获取新的信息。
Feed 流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
例如抖音,快手
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做 Feed 流,采用 Timeline 的模式。
该模式的实现方案有三种:拉模式、推模式、推拉结合
拉模式:也叫做读扩散
每次读的时候获取消息,内存消耗小,但读操作过于频繁,若用户关注了许多博主,一次要读的消息也是十分多,造成延迟较高
推模式:也叫做写扩散。
发消息时写入粉丝收件箱,内存占用更高,写操作频繁,若博主有许多粉丝,写操作更加繁重
推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。
普通博主,粉丝少,可以采用推模式,写操作并不是很繁重
大v博主,粉丝多;分两种粉丝,活跃粉,普通粉;活跃粉,数量少,可以采用推模式;普通粉,数量多,但上线查看少,采用拉模式,什么时候看什么时候拉取。
①修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱
②收件箱满足可以根据时间戳排序,用 Redis 的数据结构实现
③查询收件箱数据时,可以实现分页查询
Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式(会导致重复读取)。
- @Override
- public Result saveBlog(Blog blog) {
- // 1. 获取登录用户
- Long userId = UserHolder.getUser().getId();
- blog.setUserId(userId);
- // 2. 保存探店博文
- boolean isSuccess = save(blog);
- if(!isSuccess){
- return Result.fail("发布失败,请重试!");
- }
- // 3. 推送到粉丝收件箱
- // 3.1 获取所有粉丝
- List<Follow> follows = followService.query().eq("follow_user_id", userId).list();
- if(follows == null || follows.isEmpty()){
- // 返回id
- return Result.ok(blog.getId());
- }
- //3.2 向每位粉丝的收件箱推送blog
- for(Follow follow : follows){
- String key = FEED_KEY + follow.getUserId();
- stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
- }
- // 返回id
- return Result.ok(blog.getId());
- }
分页查询中的几个查询参数需要注意
redis中分析滚动查询
- @Override
- public Result queryFollowBlog(Long max, Long offset) {
- //1. 获取登录用户
- Long userId = UserHolder.getUser().getId();
- //2. 分页查询收件箱内容 ZREVRANGEBYSCORE key Max Min LIMIT offset count limit是小于等于的意思,小于等于查询的最后时间戳
- String key = FEED_KEY + userId;
- Set<ZSetOperations.TypedTuple<String>> blogIdAndScores = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 3);
- //2.1 没有关注内容
- if(blogIdAndScores == null || blogIdAndScores.isEmpty()){
- return Result.ok(Collections.emptyList());
- }
- //2.2 解析数据 blogId,minTime(时间戳), offset
- List<Long> blogIds = new ArrayList<>(blogIdAndScores.size());
- long minTime = 0;
- int count = 1; //score等于minTime的记录的条数
- for(ZSetOperations.TypedTuple<String> blogIdAndScore : blogIdAndScores){
- blogIds.add(Long.valueOf(blogIdAndScore.getValue()));
- long score = blogIdAndScore.getScore().longValue();
- if(score == minTime){
- count++;
- }else{
- minTime = score;
- count = 1;
- }
- }
- //2.3 查询blog
- String idStr = StrUtil.join(",", blogIds);
- List<Blog> blogs = query().in("id", blogIds).last("order by field(id," + idStr + ")").list();
- for(Blog blog : blogs){
- //查询用户
- queryBlogUser(blog);
- //查询是否点赞
- isBlogLike(blog);
- }
-
- ScrollResult r = new ScrollResult();
- r.setList(blogs);
- r.setOffset(count);
- r.setMinTime(minTime);
- return Result.ok(r);
- }
GEO 就是 Geolocation 的简写形式,代表地理坐标。
Redis 在 3.2 版本中加入了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。
常见的命令有:
GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST:计算指定的两个点之间的距离并返回
GEOHASH:将指定 member 的坐标转为 hash 字符串形式并返回
GEOPOS:返回指定member的坐标
GEORADIUS:指定圆心、半径,找到该圆内包含的所有 member,并按照与圆心之间的距离排序后返回。6.2 以后已废弃
GEOSEARCH:在指定范围内搜索 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2 新功能
GEOSEARCHSTORE:与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 key。 6.2.新功能
按照商户类型做分组,类型相同的商户作为同一组,以typeld为key存入同一个GEO集合中即可
- @Test
- void loadShopData() {
- List<Shop> shopList = shopService.list();
-
- //按照type分类
- Map<Long, List<Shop>> map = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));
- //分批写入
- for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){
- Long typeId = entry.getKey();
- String key = "shop:geo:" + typeId;
- List<Shop> shops = entry.getValue();
- List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(shops.size());
-
- for(Shop shop : shops){
- locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX(), shop.getY())));
- }
- stringRedisTemplate.opsForGeo().add(key, locations);
- }
- }
功能逻辑很直接,
1. 判断输入参数是否需要查询附近
2.计算分页参数,根据typeId在Redis中查询
3.返回结果不为空时,对返回结果处理
3.1 GEO中我们存的只有id,最后查询还是要落到数据库上,所以要先取出id
3.2 由于分页,我们需要截取结果中的指定部分,这里注意截取之后的结果可能为空
3.3. 为取出的shop赋予distance属性
4. 返回结果
- @Override
- public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
- //1. 判断是否为普通查询
- if(x == null || y == null){
- // 根据类型分页查询
- Page<Shop> page = query()
- .eq("type_id", typeId)
- .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
- // 返回数据
- return Result.ok(page.getRecords());
- }
- //2. 计算分页参数
- int start = (current - 1) * DEFAULT_BATCH_SIZE, end = current * DEFAULT_BATCH_SIZE;
- //3. 根据类型在Redis中查询
- String key = SHOP_GEO_KEY + typeId;
- GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults = stringRedisTemplate.opsForGeo().search(
- key,
- GeoReference.fromCoordinate(x, y),
- new Distance(5000),
- RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
- if (geoResults == null) {
- return Result.ok(Collections.emptyList());
- }
- //4. 解析出id
- List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = geoResults.getContent();
- if(content.size() <= start){
- //后面要跳过start个元素,如果此时元素数量小于start
- return Result.ok(Collections.emptyList());
- }
- //4.1 截取start~end部分
- List<Long> ids = new ArrayList<>(content.size());
- Map<String, Distance> distanceMap = new HashMap<>(content.size());
- content.stream().skip(start).forEach(result -> {
- String idStr = result.getContent().getName();
- ids.add(Long.valueOf(idStr));
- distanceMap.put(idStr, result.getDistance());
- });
- //4.2 根据id取出shop
- String idStr = StrUtil.join(",", ids);
- List<Shop> shops = query().in("id", ids).last("order by FIELD(id," + idStr + ")").list();
- //4.3 给shop赋上distance属性
- shops.forEach(shop -> shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()));
- return Result.ok(shops);
- }
假如用一张表来存储用户签到信息,每个用户的每次签到都是一次记录,会占用大量的空间
使用bitmap可以极大地改善。
我们按月来统计用户签到信息,签到记录为 1,未签到则记录为 0
把每一个 bit 位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这样一个月也只消耗31位(4字节)这种思路就称为位图(BitMap)
Redis 中 是利用 string 类型数据结构实现 BitMap,因此最大上限是 512M,转换为 bit 则是 2^32个 bit 位。
BitMap 的操作命令有:
SETBIT:向指定位置(offset)存入一个 0 或 1
GETBIT :获取指定位置(offset)的 bit 值
BITCOUNT :统计 BitMap 中值为 1 的 bit 位的数量
BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD_RO :获取 BitMap 中 bit 数组,并以十进制形式返回
BITOP :将多个 BitMap 的结果做位运算(与 、或、异或)
BITPOS :查找 bit 数组中指定范围内第一个 0 或 1 出现的位置
- @Override
- public Result sign() {
- //1.当前用户
- Long userId = UserHolder.getUser().getId();
- //2.当前日期
- LocalDateTime now = LocalDateTime.now();
- //3.签到
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key = USER_SIGN_KEY + userId + keySuffix;
- stringRedisTemplate.opsForValue().setBit(key, now.getDayOfMonth() - 1, true);
- return Result.ok();
- }
统计本月到今天为止的连续签到天数
- @Override
- public Result signCount() {
- //1.当前用户
- Long userId = UserHolder.getUser().getId();
- //2.当前日期
- LocalDateTime now = LocalDateTime.now();
- //3.查询签到记录
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key = USER_SIGN_KEY + userId + keySuffix;
- List<Long> result = stringRedisTemplate.opsForValue().bitField(
- key,
- BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(now.getDayOfMonth())).valueAt(0)
- );
- //没有任何签到结果
- if(result == null || result.isEmpty()){
- return Result.ok(0);
- }
- Long signed = result.get(0);
- //本月没有签到
- if(signed == null || signed == 0){
- return Result.ok(0);
- }
- int count = 0;
- while ((signed & 1) != 0) {
- count++;
- signed >>>= 1;
- }
- return Result.ok(count);
- }
Redis的HyperLogLog的统计功能
UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1 天内同一个用户多次访问该网站,只记录1次。
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录 1 次PV,用户多次打开页面,则记录多次PV。
往往用来衡量网站的流量。
Hyperloglog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。
相关算法原理可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis 中的 HLL 是基于 string 结构实现的,单个 HLL 的内存永远小于 16 kb,内存占用低,但相对的其测量结果是概率性的,有小于 0.81% 的误差。不过对于 UV 统计的庞大数量来说,这完全可以忽略。
通过单元测试,向 HyperLogLog 中添加 100 万条数据
- @Test
- void testHyperLogLog() {
- String[] values = new String[1000];
- int j = 0;
- for (int i = 0; i < 1000000; i++) {
- j = i % 1000;
- values[j] = "user_" + i;
- if (j == 999) {
- // 发送到 Redis
- stringRedisTemplate.opsForHyperLogLog().add("hl2", values);
- }
- }
- // 统计数量
- Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
- System.out.println("count = " + count);
- }
总结:
HyperLogLog 的作用:做海量数据的统计工作
HyperLogLog 的优点:内存占用极低、性能非常好
HyperLogLog 的缺点:有一定的误差
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。