赞
踩
所谓幂等性,简单地说就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
考虑以下几个场景:
在这里顺便说一下我自己的理解,防重设计 和 幂等设计,其实是有区别的。防重设计主要为了避免产生重复数据,对接口返回没有太多要求。而幂等设计除了避免产生重复数据之外,还要求每次请求都返回一样的结果,这就要求对结果做适当的缓存,针对相同的参数返回一样的结果。而 加锁 只是针对线程安全的措施,并行转串行,对效率影响很大,可作为 防重设计 和 幂等设计 的辅助工具。
绝大数情况下,为了防止重复数据的产生,我们都会在表中加唯一索引,这是一个非常简单,并且有效的方案。加了唯一索引之后,第一次请求数据可以插入成功。但后面的相同请求,插入数据时会报唯一索引冲突异常。虽说抛异常对数据来说没有影响,不会造成错误数据。但是为了保证接口幂等性,我们需要对该异常进行捕获,然后返回成功。
具体步骤:
有时候表中并非所有的场景都不允许产生重复的数据,只有某些特定场景才不允许。这时候,直接在表中加唯一索引,显然是不太合适的。针对这种情况,我们可以通过建防重表来解决问题。
该表可以只包含两个字段:id 和 唯一索引,唯一索引可以是多个字段比如:name、code等组合起来的唯一标识,例如:susan_0001。
具体步骤:
乐观锁的实现方式多种多样,可以通过在表中增加一个timestamp或者version字段或者直接使用已有的状态字段,具体选用何种方式需要考虑具体的业务要求,这里以version字段为例。
在更新数据之前先查询一下数据:
select id,amount,version from user id=123;
如果数据存在,假设查到的version等于1,再使用id和version字段作为查询条件更新数据:
update user set amount=amount+100,version=version+1 where id=123 and version=1;
更新数据的同时version+1,然后判断本次update操作的影响行数,如果大于0,则说明本次更新成功,如果等于0,则说明本次更新没有让数据变更。
由于第一次请求version等于1是可以成功的,操作成功后version变成2了。这时如果并发的请求过来,再执行和上面一样的sql,那么,该update操作不会真正更新数据,最终sql的执行结果影响行数是0,因为version已经变成2了,where中的version=1肯定无法满足条件。但为了保证接口幂等性,接口可以直接返回成功,因为version值已经修改了,那么前面必定已经成功过一次,后面都是重复的请求。
具体步骤:
在支付场景中,用户A的账号余额有150元,想转出100元,正常情况下用户A的余额只剩50元。一般情况下,sql是这样的:
update user amount = amount-100 where id=123;
如果出现多次相同的请求,可能会导致用户A的余额变成负数。这种情况,用户A来可能要哭了。于此同时,系统开发人员可能也要哭了,因为这是很严重的系统bug。
为了解决这个问题,可以加悲观锁,将用户A的那行数据锁住,在同一时刻只允许一个请求获得锁,更新数据,其他的请求则等待。
select * from user id=123 for update;
具体步骤:
不需要第三方工具的支持,jdk自带工具就可以实现。加锁的时候不需要设置超时时间,因为只要程序没有挂掉,不管有没有异常最终都会走finally方法释放锁,就算程序挂掉了,重新起来后,加锁数据都是空的了,因为数据都是存到内存的。这样有个好处,就是避免了加锁之后,业务还没处理完,锁到期的情况。
代码片段如下
定义注解
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface Resubmit {
- /**
- * 释放锁多久之后可以再次提交
- */
- int delay() default 20;
-
- /**
- * 延时时间单位
- */
- TimeUnit timeUnit() default TimeUnit.SECONDS;
- }
封装加锁工具
- @Slf4j
- public final class ResubmitLock {
- private static final ConcurrentHashMap<String, Object> LOCK_CACHE = new ConcurrentHashMap<>(200);
- private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadPoolExecutor.DiscardPolicy());
- private ResubmitLock() {
- }
-
- /**
- * 静态内部类 单例模式
- *
- * @return
- */
- private static class SingletonInstance {
- private static final ResubmitLock INSTANCE = new ResubmitLock();
- }
-
- public static ResubmitLock getInstance() {
- return SingletonInstance.INSTANCE;
- }
-
-
- public static String handleKey(String param) {
- return DigestUtil.md5Hex(param == null ? "" : param);
- }
-
- /**
- * 加锁 putIfAbsent 是原子操作保证线程安全
- *
- * @param key 对应的key
- * @param value
- * @return
- */
- public boolean lock(final String key, Object value) {
- return Objects.isNull(LOCK_CACHE.putIfAbsent(key, value));
- }
-
- /**
- * 延时释放锁 用以控制短时间内的重复提交
- *
- * @param key 对应的key
- * @param delay 延时时间
- * @param timeUnit 延时时间单位
- */
- public void unLock(final String key, final int delay, final TimeUnit timeUnit) {
- EXECUTOR.schedule(() -> {
- LOCK_CACHE.remove(key);
- }, delay, timeUnit);
- }
- }

定义切面
- @Slf4j
- @Aspect
- @Component
- public class ResubmitDataAspect {
- private final static Object PRESENT = new Object();
-
- @Around("@annotation(com.mysuc.local.Resubmit)")
- public Object handleResubmit(ProceedingJoinPoint joinPoint) throws Throwable {
- Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
- // 获取注解信息
- Resubmit annotation = method.getAnnotation(Resubmit.class);
- int delay = annotation.delay();
- TimeUnit timeUnit = annotation.timeUnit();
- HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
- // 只针对PUT和POST请求验重
- if(StringUtils.startsWithIgnoreCase(request.getContentType(), MediaType.APPLICATION_JSON_VALUE) && StringUtils.equalsAny(request.getMethod(), HttpMethod.PUT.name(),HttpMethod.POST.name())){
- // 获取请求body内容
- String bodyStr = "";
- try (final BufferedReader reader = request.getReader()) {
- bodyStr = IoUtil.read(reader);
- } catch (IOException e) {
- throw new IORuntimeException(e);
- }
- if (StringUtils.isEmpty(bodyStr)) {
- return CommonResult.error("请求body不能为空");
- }
- // 生成加密参数 使用了content_MD5的加密方式
- String key = ResubmitLock.handleKey(bodyStr);
- log.info("加锁key值:{}",key);
- // 执行锁
- boolean lock = false;
- //设置解锁key
- lock = ResubmitLock.getInstance().lock(key, PRESENT);
- if (lock) {
- try{
- //放行
- return joinPoint.proceed();
- }finally {
- //设置解锁key和解锁时间
- ResubmitLock.getInstance().unLock(key, delay, timeUnit);
- }
- } else {
- //响应重复提交异常
- return CommonResult.error(ResultEnum.REPEATED_REQUESTS);
- }
- }else{
- //放行
- return joinPoint.proceed();
- }
- }
- }

注解使用
- @Slf4j
- @RestController
- @RequestMapping("/resubmit")
- public class ResubmitController {
- @PostMapping("/submit")
- @Resubmit(delay = 1,timeUnit = TimeUnit.MINUTES)
- public CommonResult submit(@RequestBody Map<String, Object> body){
- log.info("接收到参数:{}", JSON.toJSONString(body));
- return CommonResult.success();
- }
- }
其实前面介绍过的加唯一索引或者加防重表,本质是使用了数据库的分布式锁,也属于分布式锁的一种。但由于数据库分布式锁的性能不太好,我们可以改用分布式锁,在分布式环境下,锁定全局唯一资源,使请求串行化。分布式锁有很多实现方案,比如redis和zookeeper。
其实就是把多线程并发锁的思路,引入分布式系统,也就是分布式系统中的解决思路。
具体步骤:
注意:分布式锁一定要设置一个合理的过期时间,如果设置过短,无法有效的防止重复请求。如果设置过长,可能会浪费redis的存储空间,需要根据实际业务情况而定。
token 机制的核心思想是为每一次操作生成一个唯一性的凭证,也就是 token。一个 token 在操作的每一个阶段只有一次执行权,一旦执行成功则保存执行结果。对重复的请求,返回同一个结果。token 机制的应用十分广泛。
该方案跟之前的所有方案都有点不一样,需要两次请求才能完成一次业务操作。
具体步骤:
代码片段如下
- @Slf4j
- @Service
- public class TokenUtilService {
- @Autowired
- private StringRedisTemplate redisTemplate;
-
- /**
- * 存入 Redis 的 Token 键的前缀
- */
- private static final String IDEMPOTENT_TOKEN_PREFIX = "idempotent_token:";
-
- /**
- * 创建 Token 存入 Redis,并返回该 Token
- *
- * @param value 用于辅助验证的 value 值
- * @return 生成的 Token 串
- */
- public String generateToken(String value) {
- // 实例化生成 ID 工具对象
- String token = UUID.randomUUID().toString();
- // 设置存入 Redis 的 Key
- String key = IDEMPOTENT_TOKEN_PREFIX + token;
- // 存储 Token 到 Redis,且设置过期时间为5分钟
- redisTemplate.opsForValue().set(key, value, 5, TimeUnit.MINUTES);
- // 返回 Token
- return token;
- }
-
- /**
- * 验证 Token 正确性
- *
- * @param token token 字符串
- * @param value value 存储在Redis中的辅助验证信息
- * @return 验证结果
- */
- public boolean validToken(String token, String value) {
- // 设置 Lua 脚本,其中 KEYS[1] 是 key,KEYS[2] 是 value
- String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
- RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
- // 根据 Key 前缀拼接 Key
- String key = IDEMPOTENT_TOKEN_PREFIX + token;
- // 执行 Lua 脚本
- Long result = redisTemplate.execute(redisScript, Arrays.asList(key, value));
- // 根据返回结果判断是否成功成功匹配并删除 Redis 键值对,若果结果不为空和0,则验证通过
- if (result != null && result != 0L) {
- log.info("验证 token={},key={},value={} 成功", token, key, value);
- return true;
- }
- log.info("验证 token={},key={},value={} 失败", token, key, value);
- return false;
- }
- }

- @Slf4j
- @RestController
- public class TokenController {
- @Autowired
- private TokenUtilService tokenService;
-
- /**
- * 获取 Token 接口
- *
- * @return Token 串
- */
- @GetMapping("/token")
- public String getToken() {
- // 获取用户信息(这里使用模拟数据)
- // 注:这里存储该内容只是举例,其作用为辅助验证,使其验证逻辑更安全,如这里存储用户信息,其目的为:
- // - 1)、使用"token"验证 Redis 中是否存在对应的 Key
- // - 2)、使用"用户信息"验证 Redis 的 Value 是否匹配。
- String userInfo = "mydlq";
- // 获取 Token 字符串,并返回
- return tokenService.generateToken(userInfo);
- }
-
- /**
- * 接口幂等性测试接口
- *
- * @param token 幂等 Token 串
- * @return 执行结果
- */
- @PostMapping("/test")
- public String test(@RequestHeader(value = "token") String token) {
- // 获取用户信息(这里使用模拟数据)
- String userInfo = "mydlq";
- // 根据 Token 和与用户相关的信息到 Redis 验证是否存在对应的信息
- boolean result = tokenService.validToken(token, userInfo);
- // 根据验证结果响应不同信息
- return result ? "正常调用" : "重复调用";
- }
- }

- @SpringBootTest
- @Slf4j
- class IdempotenceApplicationTests {
- @Autowired
- private WebApplicationContext webApplicationContext;
- @Test
- public void interfaceIdempotenceTest() throws Exception {
- // 初始化 MockMvc
- MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
- // 调用获取 Token 接口
- String token = mockMvc.perform(MockMvcRequestBuilders.get("/token")
- .accept(MediaType.TEXT_HTML))
- .andReturn()
- .getResponse().getContentAsString();
- log.info("获取的 Token 串:{}", token);
- // 循环调用 5 次进行测试
- for (int i = 1; i <= 5; i++) {
- log.info("第{}次调用测试接口", i);
- // 调用验证接口并打印结果
- String result = mockMvc.perform(MockMvcRequestBuilders.post("/test")
- .header("token", token)
- .accept(MediaType.TEXT_HTML))
- .andReturn().getResponse().getContentAsString();
- log.info(result);
- // 结果断言
- if (i == 1) {
- Assert.equals(result,"正常调用");
- } else {
- Assert.equals(result,"重复调用");
- }
- }
- }
- }

使用redisson提供的RBucket,实现效果也还不错
示例代码如下,模拟了20个线程同时操作,最终只会有一个线程顺利操作
- @Test
- void testRBucket() {
- RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
- countDownLatch.trySetCount(20);
- RBucket<String> bucket = redissonClient.getBucket("mysuc");
- bucket.set("内容1 ");
- String oldValue = bucket.get();
- ExecutorService executorService = Executors.newFixedThreadPool(20);
- for (int i = 0; i < 20; i++) {
- executorService.submit(() -> {
- if (bucket.compareAndSet(oldValue, "内容2")) {
- System.out.println("线程" + Thread.currentThread().getId() + "更新了bucket的值");
- }
- countDownLatch.countDown();
- });
- }
- try {
- countDownLatch.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }

所谓请求序列号,其实就是每次向服务端请求时候附带一个短时间内唯一不重复的序列号,该序列号可以是一个有序 ID,也可以是一个订单号,一般由下游生成,在调用上游服务端接口时附加该序列号。这种方式类似于分布式锁。
主要步骤:
① 下游服务生成分布式 ID 作为序列号,然后执行请求调用上游接口,并附带“唯一序列号”与请求的“认证凭据ID”。
② 上游服务进行安全效验,检测下游传递的参数中是否存在“序列号”和“凭据ID”。
③ 上游服务到 Redis 中检测是否存在对应的“序列号”与“认证ID”组成的 Key,如果存在就抛出重复执行的异常信息,然后响应下游对应的错误信息。如果不存在就以该“序列号”和“认证ID”组合作为 Key,以下游关键信息作为 Value,进而存储到 Redis 中(根据业务要求设置失效时间),然后正常执行接来来的业务逻辑。
注意:往Redis中设置Key的时候 一定要设置过期时间。这样能保证在这个时间范围内,如果重复调用接口,则能够进行判断识别。如果不设置过期时间,很可能导致数据无限量的存入Redis,致使Redis不能正常工作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。