赞
踩
本套笔记基于黑马程序员《黑马点评》项目,供大家参考
目录
验证码我们使用手机号作为Key,value数据结构使用String;用户信息我们以随机token作为Key,Hash作为value的数据结构,并将随机token存储在浏览器前端每次请求都会携带这个随机的token.
主要实现客户向后端发送验证码,登入,根据/me接口查看当前用户是否有权限,进入界面
- /**
- * 发送手机验证码
- */
- @PostMapping("/code")
- public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
- // 发送短信验证码并保存验证码
- return userService.sendCode(phone, session);
- }
-
- /**
- * 登录功能
- * @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
- */
- @PostMapping("/login")
- public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
- // 实现登录功能
- return userService.login(loginForm, session);
- }
-
- /**
- * 登入校验功能
- * @return 返回当前登入的用户
- */
- @GetMapping("/me")
- public Result me(){
- // 获取当前登录的用户并返回
- return Result.ok(ThreadLocalUtls.getUser());
- }
-
验证码匹配
- @Override
- public Result sendCode(String phone, HttpSession session) {
- // 1、判断手机号是否合法
- if (RegexUtils.isPhoneInvalid(phone)) {
- return Result.fail("手机号格式不正确");
- }
- // 2、手机号合法,生成验证码,并保存到Redis中
- String code = RandomUtil.randomNumbers(6);
- stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code,
- RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
- // 3、发送验证码
- log.info("验证码:{}", code);
- return Result.ok();
- }
实现用户登入功能:将UserDto对象存储在Redis中的时候,我们将对象转化为hash,其中hash的key和value值都需要是String类型,我们需要对对象的字段值进行处理转为String类型
设置Redis过期时间,我们需要完成在客户没有交互后的30钟内将其过期,而不是用户登入后30就将其强制过期
- /**
- * 用户登录
- *
- * @param loginForm
- * @param session
- * @return
- */
- @Override
- public Result login(LoginFormDTO loginForm, HttpSession session) {
- String phone = loginForm.getPhone();
- String code = loginForm.getCode();
- // 1、判断手机号是否合法
- if (RegexUtils.isPhoneInvalid(phone)) {
- return Result.fail("手机号格式不正确");
- }
- // 2、判断验证码是否正确
- String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
- if (code == null || !code.equals(redisCode)) {
- return Result.fail("验证码不正确");
- }
- // 3、判断手机号是否是已存在的用户
- User user = this.getOne(new LambdaQueryWrapper<User>()
- .eq(User::getPhone, phone));
- if (Objects.isNull(user)) {
- // 用户不存在,需要注册
- user = createUserWithPhone(phone);
- }
- // 4、保存用户信息到Redis中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)
- UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
- // 将对象中字段全部转成string类型,StringRedisTemplate只能存字符串类型的数据
- Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
- CopyOptions.create().setIgnoreNullValue(true).
- setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
- String token = UUID.randomUUID().toString(true);
-
- String tokenKey = LOGIN_USER_KEY + token;
- stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
- log.info(tokenKey);
- stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
-
- // 登录成功,将用户信息保存到ThreadLocal中,方便后续登录校验
- //将用户信息保存到UserDTO,抹去了密码,实现了信息的安全
- ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));
- return Result.ok(token);
- }
-
- /**
- * 根据手机号创建用户并保存
- *
- * @param phone
- * @return
- */
- private User createUserWithPhone(String phone) {
- User user = new User();
- user.setPhone(phone);
- user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
- this.save(user);
- return user;
- }
第一个拦截器,我们获取当前用户放在ThreadLocal,如果没有就放行,并刷新有效期,对所有路径有效。
第二个拦截器,我们只判断当前用户是否登入,ThreadLocal是否有该用户
判断当前用户是否已登入
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- // 判断当前用户是否已登录
- if (ThreadLocalUtls.getUser() == null){
- // 当前用户未登录,直接拦截
- response.setStatus(HttpStatus.HTTP_UNAUTHORIZED);
- return false;
- }
- // 用户存在,直接放行
- return true;
- }
刷新Redis中当前用户的有效时间,拦截器没有被注解标识,因此RedisTemplate需要通过构造器的方法进行注入
- public class RefreshTokenInterceptor implements HandlerInterceptor {
-
- private StringRedisTemplate stringRedisTemplate;
-
- public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
- this.stringRedisTemplate = stringRedisTemplate;
- }
-
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- // 1、获取token,并判断token是否存在
- String token = request.getHeader("authorization");
- if (StrUtil.isBlank(token)){
- // token不存在,说明当前用户未登录,不需要刷新直接放行
- return true;
- }
- // 2、判断用户是否存在
- String tokenKey = LOGIN_USER_KEY + token;
- Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);
- if (userMap.isEmpty()){
- // 用户不存在,说明当前用户未登录,不需要刷新直接放行
- return true;
- }
- // 3、用户存在,则将用户信息保存到ThreadLocal中,方便后续逻辑处理,比如:方便获取和使用用户信息,Redis获取用户信息是具有侵入性的
- UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
- ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));
- // 4、刷新token有效期
- stringRedisTemplate.expire(token, LOGIN_USER_TTL, TimeUnit.MINUTES);
- return true;
- }
- }
将拦截器进行配置,order值越小越先执行
- @Configuration
- public class WebMvcConfig implements WebMvcConfigurer {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- // 添加登录拦截器
- registry.addInterceptor(new LoginInterceptor())
- // 设置放行请求
- .excludePathPatterns(
- "/user/code",
- "/user/login",
- "/blog/hot",
- "/shop/**",
- "/shop-type/**",
- "/upload/**",
- "/voucher/**",
- // knife4j接口文档请求
- "/doc.html",
- "/webjars/**",
- "/swagger-resources",
- "/v2/api-docs"
- ).order(1); // 优先级默认都是0,值越大优先级越低
- // 添加刷新token的拦截器
- registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
- }
- }
我们将商品信息直接添加在Redis中,当我们在Redis中找不到的时候再将数据从数据库中更新到Redis中去,提高查询速度,但是会出现数据库和Redis的一致性问题
- /**
- * 根据id查询商铺信息
- *
- * @param id 商铺id
- * @return 商铺详情数据
- */
- @GetMapping("/{id}")
- public Result queryShopById(@PathVariable("id") Long id) {
- return shopService.queryById(id);
- }
-
- /**
- * 更新商铺信息
- *
- * @param shop 商铺数据
- * @return 无
- */
- @PutMapping
- public Result updateShop(@RequestBody Shop shop) {
- return shopService.updateShop(shop);
- }
我们先更新数据后,删除缓存;当查询缓存的时候未命中,我们查询数据库,将数据写入缓存;我们一定是先更新数据库后,删除缓存,而不是删除缓存后更新数据库,降低数据的不一致性。并设置超时时间,保证数据的强一致性
最佳实践方案:
- @Transactional
- @Override
- public Result updateShop(Shop shop) {
- // 参数校验, 略
-
- // 1、更新数据库中的店铺数据
- boolean f = this.updateById(shop);
- if (!f) {
- // 缓存更新失败,抛出异常,事务回滚
- throw new RuntimeException("数据库更新失败");
- }
- // 2、删除缓存
- f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());
- if (!f) {
- // 缓存删除失败,抛出异常,事务回滚
- throw new RuntimeException("缓存删除失败");
- }
- return Result.ok();
- }
当数据库和Redis中都没有数据的时候,这样缓存就不会生效,请求将直接打到数据库中
我们修改Service层queryById(Long id)的处理逻辑:
- /**
- * 根据id查询数据(处理缓存穿透)
- *
- * @param keyPrefix key前缀
- * @param id 查询id
- * @param type 查询的数据类型
- * @param dbFallback 根据id查询数据的函数
- * @param timeout 有效期
- * @param unit 有效期的时间单位
- * @param <T>
- * @param <ID>
- * @return
- */
- public <T, ID> T handleCachePenetration(String keyPrefix, ID id, Class<T> type,
- Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {
- String key = keyPrefix + id;
- // 1、从Redis中查询店铺数据
- String jsonStr = stringRedisTemplate.opsForValue().get(key);
-
- T t = null;
- // 2、判断缓存是否命中
- if (StrUtil.isNotBlank(jsonStr)) {
- // 2.1 缓存命中,直接返回店铺数据
- t = JSONUtil.toBean(jsonStr, type);
- return t;
- }
-
- // 2.2 缓存未命中,判断缓存中查询的数据是否是空字符串(isNotBlank把null和空字符串给排除了)
- if (Objects.nonNull(jsonStr)) {
- // 2.2.1 当前数据是空字符串(说明该数据是之前缓存的空对象),直接返回失败信息
- return null;
- }
- // 2.2.2 当前数据是null,则从数据库中查询店铺数据
- t = dbFallback.apply(id);
-
- // 4、判断数据库是否存在店铺数据
- if (Objects.isNull(t)) {
- // 4.1 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息
- this.set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);
- return null;
- }
- // 4.2 数据库中存在,重建缓存,并返回店铺数据
- this.set(key, t, timeout, unit);
- return t;
- }
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
互斥锁:就是当线程1获得锁后,查询数据库重建缓冲数据,但是再这个时间内,其他线程需要处于一个等待的时间
逻辑过期:线程1获得锁后,委托另一个线程2完成重建缓冲数据和释放锁的工作,同时线程1和其他没有获得锁的线程,返回一个过期的数据。(过期时间作为value值放在Redis)
互斥锁方案更加强调数据的一致性,逻辑过期更加强调数据的可用性
我们使用Redis中的setnx lock 1,作为锁,当数据库中不存在的时候结果返回1,数据库已经存在则会返回0;缓存数据后,我们释放锁del lock
这里我们先不考虑缓存穿透的问题,即未命中的时候是null还是空字符串
我们首先需要创建如下的数据结构,记录过期时间
- @Data
- public class RedisData {
- /**
- * 过期时间
- */
- private LocalDateTime expireTime;
- /**
- * 缓存数据
- */
- private Object data;
- }
- /**
- * 根据id查询数据(处理缓存击穿)
- *
- * @param keyPrefix key前缀
- * @param id 查询id
- * @param type 查询的数据类型
- * @param dbFallback 根据id查询数据的函数
- * @param timeout 有效期
- * @param unit 有效期的时间单位
- * @param <T>
- * @param <ID>
- * @return
- */
- public <T, ID> T handleCacheBreakdown(String keyPrefix, ID id, Class<T> type,
- Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {
- String key = keyPrefix + id;
- // 1、从Redis中查询店铺数据,并判断缓存是否命中
- String jsonStr = stringRedisTemplate.opsForValue().get(key);
- if (StrUtil.isBlank(jsonStr)) {
- // 1.1 缓存未命中,直接返回失败信息
- return null;
- }
- // 1.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
- RedisData redisData = JSONUtil.toBean(jsonStr, RedisData.class);
- // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
- JSONObject data = (JSONObject) redisData.getData();
- T t = JSONUtil.toBean(data, type);
- LocalDateTime expireTime = redisData.getExpireTime();
- if (expireTime.isAfter(LocalDateTime.now())) {
- // 当前缓存数据未过期,直接返回
- return t;
- }
-
- // 2、缓存数据已过期,获取互斥锁,并且重建缓存
- String lockKey = LOCK_SHOP_KEY + id;
- boolean isLock = tryLock(lockKey);
- if (isLock) {
- // 获取锁成功,开启一个子线程去重建缓存
- CACHE_REBUILD_EXECUTOR.submit(() -> {
- try {
- // 查询数据库
- T t1 = dbFallback.apply(id);
- // 将查询到的数据保存到Redis
- this.setWithLogicalExpire(key, t1, timeout, unit);
- } finally {
- unlock(lockKey);
- }
- });
- }
-
- // 3、获取锁失败,再次查询缓存,判断缓存是否重建(这里双检是有必要的)
- jsonStr = stringRedisTemplate.opsForValue().get(key);
- if (StrUtil.isBlank(jsonStr)) {
- // 3.1 缓存未命中,直接返回失败信息
- return null;
- }
- // 3.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期
- redisData = JSONUtil.toBean(jsonStr, RedisData.class);
- // 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段
- data = (JSONObject) redisData.getData();
- t = JSONUtil.toBean(data, type);
- expireTime = redisData.getExpireTime();
- if (expireTime.isAfter(LocalDateTime.now())) {
- // 当前缓存数据未过期,直接返回
- return t;
- }
-
- // 4、返回过期数据
- return t;
- }
获取锁和释放锁
- /**
- * 获取锁
- *
- * @param key
- * @return
- */
- private boolean tryLock(String key) {
- Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
- // 拆箱要判空,防止NPE
- return BooleanUtil.isTrue(flag);
- }
-
- /**
- * 释放锁
- *
- * @param key
- */
- private void unlock(String key) {
- stringRedisTemplate.delete(key);
- }
实现全局唯一ID我们希望满足以下条件:
唯一性、递增性、安全性、高可用、高性能
实现全局唯一ID,我们使用时间戳+序列号,序列号我们取Redis自增长的自增长,但是他的key我们采用业务名称+时间的方式,防止超过范围
- /**
- * @author ghp
- * @title 分布式ID生成器
- * @description
- */
- @Component
- public class RedisIdWorker {
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- /**
- * 开始时间戳
- */
- private static final long BEGIN_TIMESTAMP = 1640995200;
- /**
- * 序列化位数
- */
- private static final int COUNT_BITS = 32;
-
- /**
- * 生成分布式ID
- * @param keyPrefix
- * @return
- */
- public long nextId(String keyPrefix){
- // 1、生成时间戳
- LocalDateTime now = LocalDateTime.now();
- long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
- long timestamp = nowSecond - BEGIN_TIMESTAMP;
- // 2、生成序列号
- // 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}
- String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
- Long count = stringRedisTemplate.opsForValue().increment(ID_PREFIX + keyPrefix + ":" + date);
- // 3、拼接并返回,时间戳向左移动32位
- return timestamp << COUNT_BITS | count;
- }
-
- public static void main(String[] args) {
- LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
- long second = time.toEpochSecond(ZoneOffset.UTC);
- System.out.println("second = " + second);
- }
- }
通过判断是否和之前的版本是否一致的乐观锁方法,成功率太低,我们需要对其进行改进,改为库存大于0的情况
set stock = stock - 1 where id = 10 and stock > 1
业务上,我们要加上一步,判断是否已经购买过优惠卷
这样简单的逻辑判断还不行,一个用户可能忽然有多个线程同时进行访问,仍然可能会出现一个用户购买多张优惠卷的问题,这个时候我们要加上锁,保证一个用户只有一个线程在判断是否已经购买。又因为这里是插入操作,我们使用悲观锁(更新操作我们可以使用乐观锁)
添加依赖
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjweaver</artifactId>
- </dependency>
在启动类上暴露代理对象
EnableAspectJAutoProxy(exposeProxy = true)
在一个JVM中会有一个锁监视器,保证只有一个线程,但是在集群情况下多个JVM就无法做到锁了,这个时候就会出现问题,这个时候我们就需要考虑分布式锁了
什么是分布式锁
线程1在执行任务阻塞的时候,锁被超时误删;如果线程二拿到了线程,线程1执行完任务后就会删除锁,这个时候线程3又进来了,造成了线程2和线程3冲突了
解决:Redis的值设置为UUID+线程id,删除的时候判断是不是自己的线程
线程在已经判断是自己的锁之后,在释放锁的过程中发生阻塞,这样,又会释放其他线程的锁
解决:这就要求我们将判断锁和释放锁封装在一起,使其具有原子性
在Resource目录下添加Lua脚本
编写Lua脚本
- ---
- --- Generated by EmmyLua(https://github.com/EmmyLua)
- --- Created by ghp.
- --- DateTime: 2023/7/13 16:19
- --- Description 释放锁(简单锁)
- ---
- -- 比较缓存中的线程标识与当前线程标识是否一致
- if (redis.call('get', KEYS[1]) == ARGV[1]) then
- -- 一致,直接删除
- return redis.call('del', KEYS[1])
- end
- -- 不一致,返回0
- return 0
加载Lua脚本
- /**
- * 加载Lua脚本
- */
- private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
-
- static {
- UNLOCK_SCRIPT = new DefaultRedisScript<>();
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
完整分布式锁如下(我们通过StringRedisTemplate和name初始化锁,就可以获取锁和释放锁):
- public class SimpleRedisLock implements Lock {
-
- /**
- * RedisTemplate
- */
- private StringRedisTemplate stringRedisTemplate;
-
- /**
- * 锁的名称
- */
- private String name;
- /**
- * key前缀
- */
- private static final String KEY_PREFIX = "lock:";
- /**
- * ID前缀
- */
- private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
-
- /**
- * 创建锁对象
- */
- public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
- this.stringRedisTemplate = stringRedisTemplate;
- this.name = name;
- }
-
-
- /**
- * 获取锁
- *
- * @param timeoutSec 超时时间
- * @return
- */
- @Override
- public boolean tryLock(long timeoutSec) {
- String threadId = ID_PREFIX + Thread.currentThread().getId() + "";
- // SET lock:name id EX timeoutSec NX
- Boolean result = stringRedisTemplate.opsForValue()
- .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
- return Boolean.TRUE.equals(result);
- }
-
- /**
- * 加载Lua脚本
- */
- private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
-
- static {
- UNLOCK_SCRIPT = new DefaultRedisScript<>();
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
-
- /**
- * 释放锁
- */
- @Override
- public void unlock() {
- // 执行lua脚本
- stringRedisTemplate.execute(
- UNLOCK_SCRIPT,
- Collections.singletonList(KEY_PREFIX + name),
- ID_PREFIX + Thread.currentThread().getId()
- );
- }
- }
解决不可重入问题的办法很简单:
在value中增加一个字段count,当同一个线程重新获取锁的时候增加count;同一个线程释放锁的时候减count,count等于0的时候在Redis中释放锁
添加依赖
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.11.5</version>
- </dependency>
主要步骤:1.创建锁 2.获取锁 3.释放锁
基于分布式锁创建订单如下
- /**
- * 创建订单
- *
- * @param voucherOrder
- */
- private void handleVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- //1.声明一个锁
- RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
- //2.获取锁
- //参数1:最大等待时间(解决了不可重试的问题),锁自动释放时间,单位
- boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
- if (!isLock) {
- // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
- log.error("一人只能下一单");
- return;
- }
- try {
- // 创建订单(使用代理对象调用,是为了确保事务生效)
- proxy.createVoucherOrder(voucherOrder);
- } finally {
- //3.释放锁
- lock.unlock();
- }
- }
具体创建订单过程仍然使用事务控制
- /**
- * 创建订单
- *
- * @param voucherOrder
- * @return
- */
- @Transactional
- @Override
- public void createVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- Long voucherId = voucherOrder.getVoucherId();
- // 1、判断当前用户是否是第一单
- int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
- .eq(VoucherOrder::getUserId, userId));
- if (count >= 1) {
- // 当前用户不是第一单
- log.error("当前用户不是第一单");
- return;
- }
- // 2、用户是第一单,可以下单,秒杀券库存数量减一
- boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
- .eq(SeckillVoucher::getVoucherId, voucherId)
- .gt(SeckillVoucher::getStock, 0)
- .setSql("stock = stock -1"));
- if (!flag) {
- throw new RuntimeException("秒杀券扣减失败");
- }
- // 3、将订单保存到数据库
- flag = this.save(voucherOrder);
- if (!flag) {
- throw new RuntimeException("创建秒杀券订单失败");
- }
- }
之前,我们我们资格判断和创建订单操作通过一条线程来完成,多次和数据库发生交换,这就导致我们任务的处理时间过程较长,这里我们主线程从Redis中获取信息判断资格,后面Mysql中数据用操作让另一条线程来完成(相当于服务员在外面帮客户点单,厨师在后面炒菜)。不过这里资格判断仍需要通过Lua脚本完成原子性操作。
在VoucherServiceImp中实现添加优惠卷信息
- /**
- * 新增秒杀券
- * @param voucher
- */
- @Override
- @Transactional
- public void addSeckillVoucher(Voucher voucher) {
- // 1、保存优惠券
- boolean result = save(voucher);
- if (!result){
- throw new RuntimeException("优惠券保存失败");
- }
- // 2、保存秒杀优惠券信息
- SeckillVoucher seckillVoucher = new SeckillVoucher();
- seckillVoucher.setVoucherId(voucher.getId());
- seckillVoucher.setStock(voucher.getStock());
- seckillVoucher.setBeginTime(voucher.getBeginTime());
- seckillVoucher.setEndTime(voucher.getEndTime());
- result = seckillVoucherService.save(seckillVoucher);
- if (!result){
- throw new RuntimeException("秒杀优惠券保存失败");
- }
- // 3、将秒杀券库存保存到Redis中(这给信息是可以持久化保存到Redis中,不想要的时候就进行手动删除)
- stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
- }
编写Lua脚本stream-seckill.lua
- ---
- --- Generated by EmmyLua(https://github.com/EmmyLua)
- --- Created by ghp.
- --- DateTime: 2023/7/16 18:22
- --- Description 判断库存是否充足 && 判断用户是否已下单 (以stream作为消息队列)
- ---
- -- 优惠券id
- local voucherId = ARGV[1];
- -- 用户id
- local userId = ARGV[2];
- -- 订单id
- local orderId = ARGV[3]
-
- -- 库存的key
- local stockKey = 'seckill:stock:' .. voucherId;
- -- 订单key
- local orderKey = 'seckill:order:' .. voucherId;
-
- -- 判断库存是否充足 get stockKey > 0 ?
- local stock = redis.call('GET', stockKey);
- if (tonumber(stock) <= 0) then
- -- 库存不足
- return 1;
- end
-
- -- 库存充足,判断用户是否已经下过单 SISMEMBER orderKey userId
- if (redis.call('SISMEMBER', orderKey, userId) == 1) then
- -- 用户下过单
- return 2;
- end
-
- -- 库存充足,没有下过单,扣库存、下单
- redis.call('INCRBY', stockKey, -1);
- redis.call('SADD', orderKey, userId);
- -- 发送消息到队列中,XDD stream.orders * key1 value1 key2 value2...
- redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
- -- 返回0,表示下单成功
- return 0;
加载Lua脚本
- /**
- * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
- */
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
-
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
异步处理订单问题
基于JVM的异步处理有一个很大的问题,那就是当线程数量变多的时候会出现内存溢出的情况,所以这里,我们使用Redis的消息队列完成异步处理。(基于Redis的单消费者模式最大问题是,当消息数量较多的时候会出现漏读消息的问题)
Stream的消费者组模式有以下的优点
消息可回溯是指在不同消费组都可以收到消息,但是同一个消费者组中都是竞争关系,在同一个消费组里只会被消费一次。总结:独立与JVM以外,消息持久化,消息的确认机制
在Redis中添加队列和消费者组,stream.orders队列,g1消费者组
XGROUP CREATE stream.orders g1 0 MKSTREAM
编写Lua脚本,我们在判断购买资格的时候直接将UserId,VoucherID,OrderId写入消费者组中,并将Lua脚本加载,这一步和上面是一样的,只不过我们增加将消息加入消费组这一步。
第一步:seckillVoucher(Long voucherId)在Redis中完成对用户的秒杀,并将信息加入到Redis的队列中去。
第二步:VoucherOrderHandler()异步去队列接受消息,handlePendingList()处理对异常信息的处理,正常接收消息调用handleVoucherOrder()完成我们的订单任务,其会获取锁后调用代理对象通过createVoucherOrder()正真完成我们订单的创建。
基于分布式锁和异步处理订单,最终完成我们的秒杀业务,完整代码如下:
- package com.hmdp.service.impl;
-
- import cn.hutool.core.bean.BeanUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.hmdp.constants.RedisConstants;
- import com.hmdp.mapper.VoucherOrderMapper;
- import com.hmdp.model.dto.Result;
- import com.hmdp.model.entity.SeckillVoucher;
- import com.hmdp.model.entity.VoucherOrder;
- import com.hmdp.service.ISeckillVoucherService;
- import com.hmdp.service.IVoucherOrderService;
- import com.hmdp.utils.RedisIdWorker;
- import com.hmdp.utils.ThreadLocalUtls;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.aop.framework.AopContext;
- import org.springframework.core.io.ClassPathResource;
- import org.springframework.data.redis.connection.RedisConnection;
- import org.springframework.data.redis.connection.stream.*;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.data.redis.core.script.DefaultRedisScript;
- import org.springframework.data.redis.core.script.RedisScript;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.time.Duration;
- import java.util.*;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- import static com.hmdp.constants.RedisConstants.*;
-
- /**
- * <p>
- * 服务实现类
- * </p>
- *
- * @author ghp
- * @since 2021-12-22
- */
- @Service
- public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
-
- @Resource
- private ISeckillVoucherService seckillVoucherService;
-
- @Resource
- private RedisIdWorker redisIdWorker;
-
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- @Resource
- private RedissonClient redissonClient;
-
- /**
- * VoucherOrderServiceImpl类的代理对象
- * 将代理对象的作用域进行提升,方面子线程取用
- */
- private IVoucherOrderService proxy;
-
-
-
- /**
- * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本
- */
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
-
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
-
- //===========================执行秒杀===========================================
- /**
- * 抢购秒杀券,实现对Redis中对资格判断,购买优惠卷的功能,并发送消息,通知去完成对数据库的操作
- *
- * @param voucherId
- * @return
- */
- @Transactional
- @Override
- public Result seckillVoucher(Long voucherId) {
- Long userId = ThreadLocalUtls.getUser().getId();
- long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
-
- // 1、执行Lua脚本,判断用户是否具有秒杀资格
- Long result = null;
- try {
- result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(),
- userId.toString(),
- String.valueOf(orderId)
- );
- } catch (Exception e) {
- log.error("Lua脚本执行失败");
- throw new RuntimeException(e);
- }
- if (result != null && !result.equals(0L)) {
- // result为1表示库存不足,result为2表示用户已下单
- int r = result.intValue();
- return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
- }
-
- // 2、result为0,下单成功,直接返回ok
- // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- this.proxy = proxy;
- return Result.ok();
- }
-
-
- //===================创建线程完成在Mysql中订单的创建=======================
- /**
- * 当前类初始化完毕就立马执行该方法在Redis中创建我们的队列
- */
- @PostConstruct
- private void init() {
- // 创建消息队列
- DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
- mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
- mqScript.setResultType(Long.class);
- Long result = null;
- try {
- result = stringRedisTemplate.execute(mqScript,
- Collections.emptyList(),
- QUEUE_NAME,
- GROUP_NAME);
- } catch (Exception e) {
- log.error("队列创建失败", e);
- return;
- }
- int r = result.intValue();
- String info = r == 1 ? "队列创建成功" : "队列已存在";
- log.debug(info);
- // 执行线程任务
- SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
- }
-
- /**
- * 线程池
- */
- private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
-
- //============================从阻塞队列中获取订单信息进行处理=====================================
- /**
- * 线程任务: 不断从阻塞队列中获取订单信息,使用handleVoucherOrder(voucherOrder)处理订单消息
- * 当出现异常的时候,我们使用handlePendingList()方法从pendinglist中获取消息进行处理
- */
- private class VoucherOrderHandler implements Runnable {
- @Override
- public void run() {
- while (true) {
- try {
- // 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS stream.orders >
- List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1","c1"),//g1消费组的名称,c1消费者名称
- //Consumer.from(GROUP_NAME, "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),//读取一个,阻塞1秒钟
- StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())//读取队列stream.orders,ReadOffset.lastConsumed()读取最新未读取的队列
- );
- // 2、判断消息获取是否成功
- if (messageList == null || messageList.isEmpty()) {
- // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
- continue;
- }
- // 3、消息获取成功,可以下单
- // 将消息转成VoucherOrder对象
- MapRecord<String, Object, Object> record = messageList.get(0);//获取的是消息id,字段名称,字段值
- Map<Object, Object> messageMap = record.getValue();//获取字段名称和字段值
- //将结果转为voucher对象,注意:之前我们在Lua脚本中定义的字段名称和voucher字段名称刚好对应
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
-
- handleVoucherOrder(voucherOrder);
- // 4、ACK确认 SACK stream.orders g1 id(队列名称,消费组,消息id)
- stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
- } catch (Exception e) {
- log.error("处理订单异常", e);
- //如果处理异常,我们就要尝试去PendingList中将消息取出来进行处理
- // 处理异常消息
- handlePendingList();
- }
- }
- }
- }
-
-
-
-
-
- //==========================使用 handlePendingList对未处理成功的信息进行处理=================================
- private void handlePendingList() {
- while (true) {
- try {
- // 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
- List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
- Consumer.from(GROUP_NAME, "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
- StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
- );
- // 2、判断pendingList中是否有效性
- if (messageList == null || messageList.isEmpty()) {
- // 2.1 pendingList中没有消息,直接结束循环
- break;
- }
- // 3、pendingList中有消息
- // 将消息转成VoucherOrder对象
- MapRecord<String, Object, Object> record = messageList.get(0);
- Map<Object, Object> messageMap = record.getValue();
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
- handleVoucherOrder(voucherOrder);
- // 4、ACK确认 SACK stream.orders g1 id
- stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
- } catch (Exception e) {
- log.error("处理订单异常", e);
- // 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁
- try {
- Thread.sleep(20);
- } catch (InterruptedException ex) {
- log.error("线程休眠异常", ex);
- }
- }
- }
- }
-
-
-
- //========================获取锁并调用createVoucherOrder()完成对订单的创建===========================
- /**
- * 创建订单
- *
- * @param voucherOrder
- */
- private void handleVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- //1.声明一个锁
- RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
- //2.获取锁
- //参数1:最大等待时间(解决了不可重试的问题),锁自动释放时间,单位
- boolean isLock = lock.tryLock();
- if (!isLock) {
- // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
- log.error("一人只能下一单");
- return;
- }
- try {
- // 创建订单(使用代理对象调用,是为了确保事务生效)
- proxy.createVoucherOrder(voucherOrder);
- } finally {
- //3.释放锁
- lock.unlock();
- }
- }
-
- /**
- * 创建订单
- *
- * @param voucherOrder
- * @return
- */
- @Transactional
- @Override
- public void createVoucherOrder(VoucherOrder voucherOrder) {
- Long userId = voucherOrder.getUserId();
- Long voucherId = voucherOrder.getVoucherId();
- // 1、判断当前用户是否是第一单
- int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
- .eq(VoucherOrder::getUserId, userId));
- if (count >= 1) {
- // 当前用户不是第一单
- log.error("当前用户不是第一单");
- return;
- }
- // 2、用户是第一单,可以下单,秒杀券库存数量减一
- boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
- .eq(SeckillVoucher::getVoucherId, voucherId)
- .gt(SeckillVoucher::getStock, 0)
- .setSql("stock = stock -1"));
- if (!flag) {
- throw new RuntimeException("秒杀券扣减失败");
- }
- // 3、将订单保存到数据库
- flag = this.save(voucherOrder);
- if (!flag) {
- throw new RuntimeException("创建秒杀券订单失败");
- }
- }
- }
@TableField表示该字段是后面添加进去的
为了同时满足判断元素是否在集合内,以及元素的排序,我们在Redis中使用scored_set类型
- ZADD z1 1 m1 2 m2 3 m3 //m1 m2 m3 元素的值分别为1 2 3
- ZSCORE z1 m1 // 得到结果1
- ZRANGE z1 0 4 //查前5名
分析:1.我们使用scored_set类型,按用户的点赞时间作为分数进行排序 2.用户每次点赞我们在数据库设置blog的liked加1 3.在blog实体类中增加字段isLiked表示当前用户是否点赞了博客
判断当前blog是否被点赞,如果被点赞在blog实体类标识被点赞
- /**
- * 判断当前用户是否点赞该博客
- */
- private void isBlogLiked(Blog blog) {
- UserDTO user = ThreadLocalUtls.getUser();
- if (Objects.isNull(user)) {
- // 当前用户未登录,无需查询点赞
- return;
- }
- Long userId = user.getId();
- String key = BLOG_LIKED_KEY + blog.getId();
- Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
- //在blog中标识被当前用户已经点赞
- blog.setIsLike(Objects.nonNull(score));
- }
实现点赞功能,用户没点赞过则点赞,用户点赞了则取消点赞
- /**
- * 点赞
- *
- * @param id
- * @return
- */
- @Override
- public Result likeBlog(Long id) {
- // 1、判断用户是否点赞
- Long userId = ThreadLocalUtls.getUser().getId();
- String key = BLOG_LIKED_KEY + id;
- // 在score_set中查用户的分数,如果查到了说明存在
- Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
- boolean result;
- if (score == null) {
- // 1.1 用户未点赞,点赞数+1
- result = this.update(new LambdaUpdateWrapper<Blog>()
- .eq(Blog::getId, id)
- .setSql("liked = liked + 1"));
- if (result) {
- // 数据库更新成功,更新缓存 zadd key value score,分数我们按时间戳去存
- stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
- }
- } else {
- // 1.2 用户已点赞,点赞数-1
- result = this.update(new LambdaUpdateWrapper<Blog>()
- .eq(Blog::getId, id)
- .setSql("liked = liked - 1"));
- if (result) {
- // 数据更新成功,更新缓存 zrem key value
- stringRedisTemplate.opsForZSet().remove(key, userId.toString());
- }
- }
- return Result.ok();
- }
查询点赞前5的用户,我们先从Redis中查询前5的用户id,再用id从Mysql中查询,注意id的顺序必须是降序
- /**
- * 查询Top5的点赞用户 zrange key 0 4
- * @param id
- * @return
- */
- @Override
- public Result queryBlogLikes(Long id) {
- // 查询Top5的点赞用户 zrange key 0 4
- String key = BLOG_LIKED_KEY + id;
- Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
- if (top5 == null || top5.isEmpty()) {
- return Result.ok(Collections.emptyList());
- }
- List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
- String idStr = StrUtil.join(",", ids);
- // 根据id降序排序 select * from tb_user where id in(1,5) order by field(id, 1, 5)
- List<UserDTO> userDTOList = userService.list(new LambdaQueryWrapper<User>()
- .in(User::getId, ids)
- .last("order by field (id," + idStr + ")"))
- .stream()
- .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
- .collect(Collectors.toList());
- return Result.ok(userDTOList);
- }
用户和博主属于多对多的关系,因此我们建立一张多对多的关系表
关注和取消关注,我们同时在Redis和关系表中取消增加用户信息
- /**
- * 关注用户
- *
- * @param followUserId 关注用户的id
- * @param isFollow 是否已关注
- * @return
- */
- @Override
- public Result follow(Long followUserId, Boolean isFollow) {
- Long userId = ThreadLocalUtls.getUser().getId();
- String key = FOLLOW_KEY + userId;
- if (isFollow) {
- // 用户未关注,则关注
- Follow follow = new Follow();
- follow.setUserId(userId);
- follow.setFollowUserId(followUserId);
- boolean isSuccess = this.save(follow);
- if (isSuccess) {
- // 用户关注信息保存成功,把关注的用户id放入Redis的Set集合中,
- stringRedisTemplate.opsForSet().add(key, followUserId.toString());
- }
- } else {
- // 用户已关注,删除关注信息
- boolean isSuccess = this.remove(new LambdaQueryWrapper<Follow>()
- .eq(Follow::getUserId, userId)
- .eq(Follow::getFollowUserId, followUserId));
- if (isSuccess) {
- stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
- }
- }
- return Result.ok();
- }
直接从数据中判断用户是否关注博主
- /**
- * 是否关注用户
- *
- * @param followUserId 关注用户的id
- * @return
- */
- @Override
- public Result isFollow(Long followUserId) {
- Long userId = ThreadLocalUtls.getUser().getId();
- int count = this.count(new LambdaQueryWrapper<Follow>()
- .eq(Follow::getUserId, userId)
- .eq(Follow::getFollowUserId, followUserId));
- return Result.ok(count > 0);
- }
利用Redis中SINTER实现共同关注
- SADD s1 m1 m2
- SADD s2 m2 m3
- SINTER s1 s2 //返回m2
- /**
- * 查询共同关注
- *
- * @param id
- * @return
- */
- @Override
- public Result followCommons(Long id) {
- Long userId = ThreadLocalUtls.getUser().getId();
- String key1 = FOLLOW_KEY + userId;
- String key2 = FOLLOW_KEY + id;
- // 查询当前用户与目标用户的共同关注对象
- Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
- if (Objects.isNull(intersect) || intersect.isEmpty()) {
- return Result.ok(Collections.emptyList());
- }
- List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
- // 查询共同关注的用户信息
- List<UserDTO> userDTOList = userService.listByIds(ids).stream()
- .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
- .collect(Collectors.toList());
- return Result.ok(userDTOList);
- }
大V对普通粉丝使用拉的方式先发到发件箱里,他们需要的时候再将消息推送给普通用户,对于活跃用户使用推的方式,直接将消息推送到他们的收件箱;对于普通人,直接使用推的方式,直接将消息发送到他们的收件箱
使用sorted_set和list都可以实现上面的需求但是,但是这里我们需要按滚动分页查询,即使有用户发送了新的消息,我们都从上一次分页的最后一条开始查询,所以这里我们直接采用sorted_set实现该功能
- /**
- * 保存探店笔记
- *
- * @param blog
- * @return
- */
- @Override
- public Result saveBlog(Blog blog) {
- Long userId = ThreadLocalUtls.getUser().getId();
- blog.setUserId(userId);
- // 保存探店笔记
- boolean isSuccess = this.save(blog);
- if (!isSuccess) {
- return Result.fail("笔记保存失败");
- }
- // 查询笔记作者的所有粉丝,找到FollowId与等于作者UserId的所有用户就可以了
- List<Follow> follows = followService.list(new LambdaQueryWrapper<Follow>()
- .eq(Follow::getFollowUserId, userId));
- // 将笔记推送给所有的粉丝
- for (Follow follow : follows) {
- // 获取粉丝的id
- Long id = follow.getUserId();
- // 推送笔记
- String key = FEED_KEY + id;
- //将blogId推送到每一个粉丝的收件箱,粉丝收件箱的key=FEED_KEY + id
- stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
- }
- return Result.ok(blog.getId());
- }
滚动分页查询的实现
参数:5~0表示范围,1表示小于等于5的偏移量,3表示输出3个
这样输出有一个问题如果sorted_set末尾有两个相同的数,偏移量仍是1的话就会出现问题
可以看到offset仍是1的话,6会重复出现,所以offset应该是上一次查询最后一个数的个数
- /**
- * 关注推送页面的分页查询
- */
- @GetMapping("/of/follow")
- public Result queryBlogOfFollow(@RequestParam("lastId") Long max,
- @RequestParam(value = "offset", defaultValue = "0") Integer offset) {
- return blogService.queryBlogOfFollow(max, offset);
- }
注意:这里使用blogId查询blog,我们需要使用自己的顺序
- /**
- * 关注推送页面的笔记分页
- *
- * @param max
- * @param offset
- * @return
- */
- @Override
- public Result queryBlogOfFollow(Long max, Integer offset) {
- // 1、查询收件箱
- Long userId = ThreadLocalUtls.getUser().getId();
- String key = FEED_KEY + userId;
- // ZREVRANGEBYSCORE key Max Min LIMIT offset count
- Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
- .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
- // 2、判断收件箱中是否有数据
- if (typedTuples == null || typedTuples.isEmpty()) {
- return Result.ok();
- }
-
- // 3、收件箱中有数据,则解析数据: blogId、minTime(时间戳)、offset
- List<Long> ids = new ArrayList<>(typedTuples.size());
- long minTime = 0; // 记录当前最小值
- int os = 1; // 偏移量offset,用来计数
- for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
- // 获取id
- ids.add(Long.valueOf(tuple.getValue()));
- // 获取分数(时间戳)
- long time = tuple.getScore().longValue();
- if (time == minTime) {
- // 当前时间等于最小时间,偏移量+1
- os++;
- } else {
- // 当前时间不等于最小时间,重置
- minTime = time;
- os = 1;
- }
- }
-
- // 4、根据id查询blog(使用in查询的数据是默认按照id升序排序的,这里需要使用我们自己指定的顺序排序)
- String idStr = StrUtil.join(",", ids);
- List<Blog> blogs = this.list(new LambdaQueryWrapper<Blog>().in(Blog::getId, ids)
- .last("ORDER BY FIELD(id," + idStr + ")"));
-
- // 设置blog相关的用户数据,是否被点赞等属性值
- for (Blog blog : blogs) {
- // 查询blog有关的用户
- queryUserByBlog(blog);
- // 查询blog是否被点赞
- isBlogLiked(blog);
- }
-
- // 5、封装并返回
- ScrollResult scrollResult = new ScrollResult();
- scrollResult.setList(blogs);
- scrollResult.setOffset(os);
- scrollResult.setMinTime(minTime);
-
- return Result.ok(scrollResult);
- }
1.添加数据,并计算距离
2.搜索附件10KM火车站
我们按店铺类型将店铺的坐标信息导入Redis中,我们将SHOP_GEO_KEY+店铺类型id作为我们的key,店铺的坐标+id作为我们的value
- /**
- * 预热店铺数据,按照typeId进行分组,用于实现附近商户搜索功能
- */
- @Test
- public void loadShopListToCache() {
- // 1、获取店铺数据
- List<Shop> shopList = shopService.list();
- // 2、根据 typeId 进行分类
- // Map<Long, List<Shop>> shopMap = new HashMap<>();
- // for (Shop shop : shopList) {
- // Long shopId = shop.getId();
- // if (shopMap.containsKey(shopId)){
- // // 已存在,添加到已有的集合中
- // shopMap.get(shopId).add(shop);
- // }else{
- // // 不存在,直接添加
- // shopMap.put(shopId, Arrays.asList(shop));
- // }
- // }
- // 使用 Lambda 表达式,更加优雅(优雅永不过时)
- Map<Long, List<Shop>> shopMap = shopList.stream()
- .collect(Collectors.groupingBy(Shop::getTypeId));
-
- // 3、将分好类的店铺数据写入redis
- for (Map.Entry<Long, List<Shop>> shopMapEntry : shopMap.entrySet()) {
- // 3.1 获取 typeId
- Long typeId = shopMapEntry.getKey();
- List<Shop> values = shopMapEntry.getValue();
- // 3.2 将同类型的店铺的写入同一个GEO ( GEOADD key 经度 维度 member )
- String key = SHOP_GEO_KEY + typeId;
- // 方式一:单个写入(这种方式,一个请求一个请求的发送,十分耗费资源,我们可以进行批量操作)
- // for (Shop shop : values) {
- // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()),
- // shop.getId().toString());
- // }
- // 方式二:批量写入
- List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();
- for (Shop shop : values) {
- locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),
- new Point(shop.getX(), shop.getY())));
- }
- stringRedisTemplate.opsForGeo().add(key, locations);
- }
- }
接口
- /**
- * 根据商铺类型分页查询商铺信息
- *
- * @param typeId 商铺类型
- * @param current 页码
- * @return 商铺列表
- */
- @GetMapping("/of/type")
- public Result queryShopByType(
- @RequestParam("typeId") Integer typeId,
- @RequestParam(value = "current", defaultValue = "1") Integer current,
- @RequestParam(value = "x", required = false) Double x,
- @RequestParam(value = "y", required = false) Double y) {
- return shopService.queryShopByType(typeId, current, x, y);
- }
实现附件商铺
当没有传入x y的时候,我们直接从数据库中查询数据,当有x y传入的时候我们再从Redis中查询数据,但是Redis只会返回0~end的数据,我们需要手动截取from~end的数据
- /**
- * 分页查询店铺数据
- *
- * @param typeId 店铺类型ID
- * @param current 页码
- * @param x 坐标x轴
- * @param y 坐标y轴
- * @return
- */
- @Override
- public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
- // 1、判断是否需要根据坐标查询
- if (x == null || y == null) {
- // 不需要坐标查询,按数据库查询
- Page<Shop> page = this.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE),
- new LambdaQueryWrapper<Shop>().eq(Shop::getTypeId, typeId));
- // 返回数据
- return Result.ok(page.getRecords());
- }
-
- // 2、需要查询坐标,则需要到Redis中进行查询
- // 2.1 计算分页参数
- int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
- int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
- // 2.2 根据经纬度从redis中查询店铺数据,并按照距离排序、分页
- String key = SHOP_GEO_KEY + typeId;
- // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE 结果: shopId、distance
- GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate
- .opsForGeo().search(
- key,
- GeoReference.fromCoordinate(x, y),
- // 默认搜索范围是5km,默认单位m
- new Distance(5000),
- // 查询从0到end,所以后面还需要截取from到end之间的数据
- RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
-
- // 4、解析出店铺id
- // 4.1 健壮性判断,防止skip出现NPE
- if (results == null) {
- // 缓存中不存在店铺数据
- return Result.ok(Collections.emptyList());
- }
-
-
-
-
- // 4.2 缓存中存在店铺数据,则需要截取 from ~ end的部分,需要判断from到end之间的数据是否存在
- List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
- if (list.size() <= from) {
- // 当前数据比起始索引还要小,说明没有我们要查询页的数据
- return Result.ok(Collections.emptyList());
- }
- // 4.3 from到end之间的数据存在,则解析出店铺id
- List<Long> ids = new ArrayList<>(list.size());
- Map<String, Distance> distanceMap = new HashMap<>(list.size());
- // skip表示直接从第from个数据开始遍历
- list.stream().skip(from).forEach(result -> {
- // 获取店铺id
- String shopIdStr = result.getContent().getName();
- ids.add(Long.valueOf(shopIdStr));
- // 获取店铺距离
- Distance distance = result.getDistance();
- distanceMap.put(shopIdStr, distance);
- });
-
-
-
-
- // 5、根据店铺ids查询出店铺数据
- String idStr = StrUtil.join(",", ids);
- // 5.1 查寻出所有符合条件的店铺数据(这里需要利用ORDER BY FIELD确保id的有序性)
- List<Shop> shopList = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
- // List<Shop> shopList = this.list(new LambdaQueryWrapper<Shop>()
- // .in(Shop::getId, ids)
- // .last("ORDER BY FIELD(id," + idStr + ")"));
- // 5.2 为店铺的距离属性进行赋值
- for (Shop shop : shopList) {
- shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
- }
-
- // 6、返回
- return Result.ok(shopList);
- }
使用Redis中的BitMap类型统计签到
BITFIELD bm1 GET u2 0 u表示按无符号 2表示读取2个数 0表示从0位开始读 返回10进制数对应的二进制就是11
BITPOS bm1 0 表示第一次出现0的位置 BITPOS bm1 1 表示第一次出现1的位置
- /**
- * 用户签到
- *
- * @return
- */
- @Override
- public Result sign() {
- // 获取当前登录用户
- Long userId = ThreadLocalUtls.getUser().getId();
- // 获取日期
- LocalDateTime now = LocalDateTime.now();
- // 拼接key
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key = USER_SIGN_KEY + userId + keySuffix;
- // 获取今天是本月的第几天
- int dayOfMonth = now.getDayOfMonth();
- // 写入Redis SETBIT key offset 1
- stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
- return Result.ok();
- }
- /**
- * 记录连续签到的天数
- *
- * @return
- */
- @Override
- public Result signCount() {
- // 1、获取签到记录
- // 获取当前登录用户
- Long userId = ThreadLocalUtls.getUser().getId();
- // 获取日期
- LocalDateTime now = LocalDateTime.now();
- // 拼接key
- String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
- String key = USER_SIGN_KEY + userId + keySuffix;
- // 获取今天是本月的第几天
- int dayOfMonth = now.getDayOfMonth();
- // 获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
- List<Long> result = stringRedisTemplate.opsForValue().bitField(
- key,
- BitFieldSubCommands.create()
- .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
- );
- // 2、判断签到记录是否存在
- if (result == null || result.isEmpty()) {
- // 没有任何签到结果
- return Result.ok(0);
- }
- // 3、获取本月的签到数(List<Long>是因为BitFieldSubCommands是一个子命令,可能存在多个返回结果,这里我们知识使用了Get,
- // 可以明确只有一个返回结果,即为本月的签到数,所以这里就可以直接通过get(0)来获取)
- Long num = result.get(0);
- if (num == null || num == 0) {
- // 二次判断签到结果是否存在,让代码更加健壮
- return Result.ok(0);
- }
- // 4、循环遍历,获取连续签到的天数(从当前天起始)
- int count = 0;
- while (true) {
- // 让这个数字与1做与运算,得到数字的最后一个bit位,并且判断这个bit位是否为0
- if ((num & 1) == 0) {
- // 如果为0,说明未签到,结束
- break;
- } else {
- // 如果不为0,说明已签到,计数器+1
- count++;
- }
- // 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
- num >>>= 1;
- }
- return Result.ok(count);
- }
使用HLL不可以统计重复的数据,相同的数据只会被当作一次统计
最后返回结果997593,误差为百分之一
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。