赞
踩
有些业务请求,属于耗时操作,需要加锁,防止后续的并发操作,同时对数据库的数据进行操作,需要避免对之前的业务造成影响。
使用 Redis
作为分布式锁,将锁的状态放到 Redis
统一维护,解决集群中单机 JVM
信息不互通的问题,规定操作顺序,保护用户的数据正确。
梳理设计流程
新建注解 @interface,在注解里设定入参标志
增加 AOP 切点,扫描特定注解
建立 @Aspect 切面任务,注册 bean 和拦截特定方法
特定方法参数 ProceedingJoinPoint,对方法 pjp.proceed() 前后进行拦截的操作
切点前先进行加锁,任务执行后再进行删除 key
使用了 RedisTemplate 的 opsForValue.setIfAbsent 方法,判断是否有 key,设定一个随机数 UUID.random().toString,生成一个随机数作为 value。
从 redis 中获取锁之后,对 key 设定 expire 失效时间,到期后自动释放锁。
按照这种设计,只有第一个成功设定 Key
的请求,才能进行后续的数据操作,后续其它请求由于无法获得资源,将会失败结束。
担心切点执行的方法太耗时,导致 Redis
中的 key
由于超时提前释放了。
例如,线程 A 先获取锁,proceed 方法耗时,超过了锁超时时间,到期释放了锁,这时另一个线程 B 成功获取 Redis
锁,两个线程同时对同一批数据进行操作,导致数据不准确。
任务不完成,锁不释放:
维护了一个定时线程池 ScheduledExecutorService
,每隔 2s 去扫描加入队列中的 Task,判断是否失效时间是否快到了,失效时间<= 当前时间+失效间隔(三分之一就算超时)
- /**
- * 线程池,每个 JVM 使用一个线程去维护 keyAliveTime,定时执行 runnable
- */
- private static final ScheduledExecutorService SCHEDULER =
- new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
- static {
- SCHEDULER.scheduleAtFixedRate(() -> {
- // do something to extend time
- }, 0, 2, TimeUnit.SECONDS);
- }
设计方案
主要核心地方:
拦截注解 @RedisLock,获取必要的参数
加锁操作
续时操作
结束业务,释放锁
之前也有整理过 AOP
使用方法,可以参考一下
- public enum RedisLockTypeEnum {
- /**
- * 自定义 key 前缀
- */
- ONE("Business1", "Test1"),
-
- TWO("Business2", "Test2");
- private String code;
- private String desc;
- RedisLockTypeEnum(String code, String desc) {
- this.code = code;
- this.desc = desc;
- }
- public String getCode() {
- return code;
- }
- public String getDesc() {
- return desc;
- }
- public String getUniqueKey(String key) {
- return String.format("%s:%s", this.getCode(), key);
- }
- }
- public class RedisLockDefinitionHolder {
- /**
- * 业务唯一 key
- */
- private String businessKey;
- /**
- * 加锁时间 (秒 s)
- */
- private Long lockTime;
- /**
- * 上次更新时间(ms)
- */
- private Long lastModifyTime;
- /**
- * 保存当前线程
- */
- private Thread currentTread;
- /**
- * 总共尝试次数
- */
- private int tryCount;
- /**
- * 当前尝试次数
- */
- private int currentCount;
- /**
- * 更新的时间周期(毫秒),公式 = 加锁时间(转成毫秒) / 3
- */
- private Long modifyPeriod;
- public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) {
- this.businessKey = businessKey;
- this.lockTime = lockTime;
- this.lastModifyTime = lastModifyTime;
- this.currentTread = currentTread;
- this.tryCount = tryCount;
- this.modifyPeriod = lockTime * 1000 / 3;
- }
- }
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ElementType.METHOD, ElementType.TYPE})
- public @interface RedisLockAnnotation {
- /**
- * 特定参数识别,默认取第 0 个下标
- */
- int lockFiled() default 0;
- /**
- * 超时重试次数
- */
- int tryCount() default 3;
- /**
- * 自定义加锁类型
- */
- RedisLockTypeEnum typeEnum();
- /**
- * 释放时间,秒 s 单位
- */
- long lockTime() default 30;
- }
避免一个请求十分耗时,导致提前释放了锁。
- // 扫描的任务队列
- private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
- /**
- * 线程池,维护keyAliveTime
- */
- private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
- {
- // 两秒执行一次「续时」操作
- SCHEDULER.scheduleAtFixedRate(() -> {
- // 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=
- Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
- while (iterator.hasNext()) {
- RedisLockDefinitionHolder holder = iterator.next();
- // 判空
- if (holder == null) {
- iterator.remove();
- continue;
- }
- // 判断 key 是否还有效,无效的话进行移除
- if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
- iterator.remove();
- continue;
- }
- // 超时重试次数,超过时给线程设定中断
- if (holder.getCurrentCount() > holder.getTryCount()) {
- holder.getCurrentTread().interrupt();
- iterator.remove();
- continue;
- }
- // 判断是否进入最后三分之一时间
- long curTime = System.currentTimeMillis();
- boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
- if (shouldExtend) {
- holder.setLastModifyTime(curTime);
- redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
- log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
- holder.setCurrentCount(holder.getCurrentCount() + 1);
- }
- }
- }, 0, 2, TimeUnit.SECONDS);
- }
做一个测试 在方法上添加该注解,然后设定相应参数即可,根据 typeEnum
可以区分多种业务,限制该业务被同时操作。
- @GetMapping("/test")
- @RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, lockTime = 3)
- public Book test(@RequestParam("userId") Long userId) {
- try {
- log.info("睡眠执行前");
- Thread.sleep(10000);
- log.info("睡眠执行后");
- } catch (Exception e) {
- // log error
- log.info("has some error", e);
- }
- return null;
- }
- 2022-10-10 14:55:50.864 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : 睡眠执行前
- 2022-10-10 14:55:52.855 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 0
- 2022-10-10 14:55:54.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 1
- 2022-10-10 14:55:56.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 2
- 2022-10-10 14:55:58.852 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 3
- 2022-10-10 14:56:00.857 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : has some error
- java.lang.InterruptedException: sleep interrupted
- at java.lang.Thread.sleep(Native Method) [na:1.8.0_221]
我这里测试的是重试次数过多,失败的场景,如果减少睡眠时间,就能让业务上正常执行跑通。
对于耗时业务和核心数据,不能让重复的请求同时操作数据,避免数据的不正确,所以要使用分布式锁来对它们进行保护这样就更好的提高效率。
整理的流程
新建注解 @interface,在注解里设定入参标志
增加 AOP 切点,扫描特定注解
建立 @Aspect 切面任务,注册 bean 和拦截特定方法
特定方法参数 ProceedingJoinPoint,对方法 pjp.proceed() 前后进行拦截
切点前进行加锁,任务执行后进行删除 key
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。