赞
踩
幂等
是一个数学与计算机科学概念,英文 idempotent [aɪˈdempətənt]。
f(x) = f(f(x))
。比如 求绝对值 的函数,就是幂等的,abs(x) = abs(abs(x))。满足幂等条件的性能叫做 幂等性
。
我们开发一个转账功能,假设我们调用下游接口 超时 了。一般情况下,超时可能是网络传输丢包的问题,也可能是请求时没送到,还有可能是请求到了,返回结果却丢了。这时候我们是否可以 重试 呢?如果重试的话,是否会多赚了一笔钱呢?
在我们日常开发中,会存在各种不同系统之间的相互远程调用。调用远程服务会有三个状态:成功
、失败
、超时
。
前两者都是明确的状态,但超时则是 未知状态。我们转账 超时 的时候,如果下游转账系统做好 幂等性校验,我们判断超时后直接发起重试,既可以保证转账正常进行,又可以保证不会多转一笔。
日常开发中,需要考虑幂等性的场景:
前端重复提交
:比如提交 form 表单时,如果快速点击提交按钮,就可能产生两条一样的数据。用户恶意刷单
:例如在用户投票这种功能时,如果用户针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,会使投票结果与事实严重不符。接口超时重复提交
:很多时候 HTTP 客户端工具都默认开启超时重试的机制,尤其是第三方调用接口的时候,为了防止网络波动等造成的请求失败,都会添加重试机制,导致一个请求提交多次。MQ重复消费
:消费者读取消息时,有可能会读取到重复消息。如果我们调用下游接口超时了,我们应该如何处理?其实从生产者和消费者两个角度来看,有两种方案处理:
两种方案都是可以的,但如果是 MQ重复消费的场景,方案一处理并不是很妥当,所以我们还是要求下游系统 对外接口支持幂等。
幂等性是为了简化客户端逻辑处理,能防止重复提交等操作,但却增加了服务端的逻辑复杂性和成本,其主要是:
在使用前,需要根据实际业务场景具体分析,除了业务上的特殊要求外,一般情况下不需要引入接口的幂等性。
Restful 推荐的几种 HTTP 接口方法中,不同的请求对幂等性的要求不同:
请求类型 | 是否幂等 | 描述 |
---|---|---|
GET | 是 | GET 方法用于获取资源。一般不会也不应当对系统资源进行改变,所以是幂等的。 |
POST | 否 | POST 方法用于创建新的资源。每次执行都会新增数据,所以不是幂等的。 |
PUT | 不一定 | PUT 方法一般用于修改资源。该操作分情况判断是否满足幂等,更新中直接根据某个值进行更新,也能保持幂等。不过执行累加操作的更新是非幂等的。 |
DELETE | 不一定 | DELETE 方法一般用于删除资源。该操作分情况判断是否满足幂等,当根据唯一值进行删除时,满足幂等;但是带查询条件的删除则不一定满足。例如:根据条件删除一批数据后,又有新增数据满足该条件,再执行就会将新增数据删除,需要根据业务判断是否校验幂等。 |
日常开发中,为了实现接口幂等性校验,可以这样实现:
try-catch
捕获。补充: 也可以新建一张 防止重复点击表,将唯一标识放到表中,存为主键或唯一索引,然后配合 tra-catch 对重复点击的请求进行处理。
伪代码如下:
/** * 幂等处理 */ Rsp idempotent(Request req){ try { insert(req); } catch (DuplicateKeyException e) { //拦截是重复请求,直接返回成功 log.info("主键冲突,是重复请求,直接返回成功,流水号:{}",bizSeq); return rsp; } //正常处理请求 dealRequest(req); return rsp; }
乐观锁
:乐观锁在操作数据时,非常乐观,认为别人不会同时在修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下,在此期间是否有人修改了数据。
乐观锁的实现:
就是给表多加一列 version 版本号,每次更新数据前,先查出来确认下是不是刚刚的版本号,没有改动再去执行更新,并升级 version(version=version+1)。
比如,我们更新前,先查一下数据,查出来的版本号是 version=1。
select order_id,version from order where order_id='666';
然后使用 version=1 和 订单ID 一起作为条件,再去更新:
update order set version = version +1,status='P' where order_id='666' and version =1
最后,更新成功才可以处理业务逻辑,如果更新失败,默认为重复请求,直接返回。
流程图如下:
为什么版本号建议自增呢?
因为乐观锁存在 ABA 的问题,如果 version 版本一直是自增的就不会出现 ABA 的情况。
悲观锁
:通俗点讲就是很悲观,每次去操作数据时,都觉得别人中途会修改,所以每次在拿数据的时候都会上锁。官方点讲就是,共享资源每次只给一个线程使用,其他线程阻塞,用完后再把资源转让给其它资源。
悲观锁的实现:
在订单业务场景中,假设先查询出订单,如果查到的是处理中状态,就处理完业务,然后再更新订单状态为完成。如果查到订单,并且不是处理中的状态,则直接返回。
可以使用数据库悲观锁(select … for update)解决这个问题:
begin; # 1.开始事务
select * from order where order_id='666' for update # 查询订单,判断状态,锁住这条记录
if(status !=处理中){
//非处理中状态,直接返回;
return ;
}
## 处理业务逻辑
update order set status='完成' where order_id='666' # 更新完成
commit; # 5.提交事务
注意:
很多业务表,都是由状态的,比如:转账流水表,就会有 0-待处理,1-处理中,2-成功,3-失败的状态。转账流水更新的时候,都会涉及流水状态更新,即涉及 状态机(即状态变更图)。我们可以利用状态机来实现幂等性校验。
状态机的实现:
比如:转账成功后,把 处理中 的转账流水更新为成功的状态,SQL 如下:
update transfor_flow set status = 2 where biz_seq='666' and status = 1;
流程图如下:
伪代码实现如下:
Rsp idempotentTransfer(Request req){ String bizSeq = req.getBizSeq(); int rows= "update transfr_flow set status=2 where biz_seq=#{bizSeq} and status=1;" if(rows==1){ log.info(“更新成功,可以处理该请求”); //其他业务逻辑处理 return rsp; } else if(rows == 0) { log.info(“更新不成功,不处理该请求”); //不处理,直接返回 return rsp; } log.warn("数据异常") return rsp: }
token 唯一令牌方案一般包括两个请求阶段:
流程图如下:
redis.del(token)
的方式,如果存在会删除成功,即处理业务逻辑,如果删除失败,则直接返回结果。补充: 这种方式个人不推荐,说两方面原因:
- 需要前后端联调才能实现,存在沟通成本,最终效果可能与设想不一致。
- 如果前端多次获取多个 token,还是可以重复请求的,如果再在获取 token 处加分布式锁控制,就不如直接用分布式锁来控制幂等性了,即下面这种解决方式。
分布式锁
实现幂等性的逻辑就是,请求过来时,先去尝试获取分布式锁,如果获取成功,就执行业务逻辑,反之获取失败的话,就舍弃请求直接返回成功。
流程图如下:
setIfAbsent()
来实现,注意分布式锁的 key 必须为业务的唯一标识。@NotRepeat 注解用于修饰需要进行幂等性校验的类。
NotRepeat.java
import java.lang.annotation.*;
/**
* 幂等性校验注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface NotRepeat {
}
AOP切面监控被 @Idempotent 注解修饰的方法调用,实现幂等性校验逻辑。
IdempotentAOP.java
import com.demo.util.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.TimeUnit; /** * 重复点击校验 */ @Slf4j @Aspect @Component public class IdempotentAOP { /** Redis前缀 */ private String API_IDEMPOTENT_CHECK = "API_IDEMPOTENT_CHECK:"; @Resource private HttpServletRequest request; @Resource private RedisUtils redisUtils; /** * 定义切面 */ @Pointcut("@annotation(com.demo.annotation.NotRepeat)") public void notRepeat() { } /** * 在接口原有的方法执行前,将会首先执行此处的代码 */ @Before("notRepeat()") public void doBefore(JoinPoint joinPoint) { String uri = request.getRequestURI(); // 登录后才做校验 UserInfo loginUser = AuthUtil.getLoginUser(); if (loginUser != null) { assert uri != null; String key = loginUser.getAccount() + "_" + uri; log.info(">>>>>>>>>> 【IDEMPOTENT】开始幂等性校验,加锁,account: {},uri: {}", loginUser.getAccount(), uri); // 加分布式锁 boolean lockSuccess = redisUtils.setIfAbsent(API_IDEMPOTENT_CHECK + key, "1", 30, TimeUnit.MINUTES); log.info(">>>>>>>>>> 【IDEMPOTENT】分布式锁是否加锁成功:{}", lockSuccess); if (!lockSuccess) { if (uri.contains("contract/saveDraftContract")) { log.error(">>>>>>>>>> 【IDEMPOTENT】文件保存中,请稍后"); throw new IllegalArgumentException("文件保存中,请稍后"); } else if (uri.contains("contract/saveContract")) { log.error(">>>>>>>>>> 【IDEMPOTENT】文件发起中,请稍后"); throw new IllegalArgumentException("文件发起中,请稍后"); } } } } /** * 在接口原有的方法执行后,都会执行此处的代码(final) */ @After("notRepeat()") public void doAfter(JoinPoint joinPoint) { // 释放锁 String uri = request.getRequestURI(); assert uri != null; UserInfo loginUser = SysUserUtil.getloginUser(); if (loginUser != null) { String key = loginUser.getAccount() + "_" + uri; log.info(">>>>>>>>>> 【IDEMPOTENT】幂等性校验结束,释放锁,account: {},uri: {}", loginUser.getAccount(), uri); redisUtils.del(API_IDEMPOTENT_CHECK + key); } } }
RedisUtils.java
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * redis工具类 */ @Slf4j @Component public class RedisUtils { /** * 默认RedisObjectSerializer序列化 */ @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 加分布式锁 */ public boolean setIfAbsent(String key, String value, long timeout, TimeUnit unit) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); } /** * 释放锁 */ public void del(String... keys) { if (keys != null && keys.length > 0) { //将参数key转为集合 redisTemplate.delete(Arrays.asList(keys)); } } }
OrderController.java
import com.demo.annotation.NotRepeat; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Arrays; import java.util.List; /** * 幂等性校验测试类 */ @RequestMapping("/order") @RestController public class OrderController { @NotRepeat @GetMapping("/orderList") public List<String> orderList() { // 查询列表 return Arrays.asList("Order_A", "Order_B", "Order_C"); // throw new RuntimeException("参数错误"); } }
请求地址:http://localhost:8080/order/orderList
日志信息如下:
经测试,加锁后,正常处理业务、抛出异常都可以正常释放锁。
整理完毕,完结撒花~ 声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。