赞
踩
任意多次执行 和 执行一次 产生的业务影响相同
系统 | 概述 | 潜在风险 |
---|---|---|
支付系统 | 同一笔订单,用户点击两次支付 | 用户重复支付 |
库存系统 | 同一笔调度请求,库存重复消费处理 | 超卖、库存错误 |
问题 | 概述 | 详情 |
---|---|---|
缺少分布式锁(查询) | 缺少锁 | 并发修改售卖量,导致售卖量被覆盖 |
分布式锁过期时间设置不合理 | 分布式锁超时时间小于事务执行时间事务重复提交,锁自动过期,且第一个事务还未执行完 | 导致锁失效 |
数据库大小写导致幂等冲突 | 数据库字段大小不写敏感 | DB00x 和 Db00X,对应的是同一个号。插入db会唯一键冲突 |
幂等UniqueKey异常未抛出 | 数据库捕获幂等key异常异常未抛出,导致下游事务重复 | 幂等异常,理论上需要抛出来,终止流程。 |
数据一致性问题 | 服务A依赖服务B,B服务超时(实际事务已成功)或主从延时A事务置为失败,且未重试 | 导致实际上服务B有数据,但是因为超时,服务A没有拿到数据。服务A后续流程都”业务“上失败了时间上服务B写入数据了,服务A读的时候,主从延时没有读取到。方法一:走主,但可能压力大方法方法二:正常情况下走从,A的total和B获取到的total不一致时再走主 |
上下游任务状态一致性问题 | A服务异常终止了,任务状态置为失败同时需要告知上游,此任务失败了。保证上下游任务状态一致性 | A服务的任务失败了,但是状态还是初始化告诉上游的B服务,任务状态为失败(终态)造成上下游任务状态不一致 |
调用方实现合理的重试策略,被调用方实现应对重试的幂等策略
单据号幂等、任务状态防重
一般是拒绝、忽略、日志打出来
场景分类 | 常见case | 识别方案 |
---|---|---|
有业务唯一标识 | 同一个单据号下发两次任务号 | 以唯一标识判断请求是否重复 |
无业务唯一标识 | “相同”类型的触发任务:同天、同地区、同业务类型。算是相同类型的任务 | 走主查询db,任务A是否已经存在了,且状态为未完成。是的话,则相同类型的任务B需要被拒绝掉 |
MQ、接口调用、数据库唯一键
约束方式 | 优势 | 劣势 | 建议使用业务场景 | |
---|---|---|---|---|
唯一键约束 | 数据库UniqueKey | 利用存储优势,确保数据的唯一性 | 灵活性较差业务UniqKey发生变动或者不动场景的UniqKey不一致,会带来较大改动高并发下容易产生主从延迟,影响业务 | 订单、支付等常见唯一单据使用操作 |
insert前先select | 减少主库压力资源串行【推荐】 | 数据库集群模式下,主从延迟,会带来风险核心的防重逻辑,可以走主select极端场景下,防不住并发(两个相同的MQ同时过来),即使走主页可能防不住并发 | 库存、余额扣减操作 | |
分布式锁redis的setNx | 内存计算,高并发可以防重、也可以防并发【推荐】 | 需要防并发 + 幂等的场景MQ可能重复,使用redis的setNx做幂等相同的MQ可能同时过来,使用redis的setNx防并发 | ||
防重表 | 灵活性较高,可根据不同的业务场景建立UniqKey | 带来额外存储基本是等量业务单据量的存储 | 业务唯一key变更频繁 | |
状态机约束 | 依据业务本身流转进行乐观锁约束可以防重、也可以防并发【推荐】 | 任务、单据有对应的完整状态机。 | ||
悲观锁约束 | 绝对化串行,利用数据库特性保证资源操作的原子性 | 容易出现死锁,对数据库性能有影响 | 账户流水余额操作:依据某条流水、增或减用户余额不建议使用 |
唯一键约束-利用数据库的UniqueKey
最为常见的约束重复单据的方式,利用数据库的唯一键进行兜底
加了唯一索引之后,第一次请求数据可以插入成功。但后面的相同请求,插入数据时会报Duplicate entry 异常,表示唯一索引有冲突
业务中捕获DuplicateKeyException异常,并返回成功
和状态机update方式思想类似,不过状态机方式不用担心后续唯一键会改变 以及 也不用吃异常
唯一键约束-insert前先select
先查,没有再写逻辑
但是主从延时场景下,select可能不准 -> ,主库压力不大场景下,可以走主select
但是高并发的场景下,t1和t2,即使都select主,发现都没有数据,结果写时,则会有一个写报错或不符合预期。
可以在select之前,再加一层redis锁setNx,专门用来防并发
唯一键约束-分布式锁[redission、zk]
但分布式锁只能保障一段时间内最多只有一个操作,无法保障无论什么时候都只允许一个操作(除非不设置锁超时的时间也不释放锁)
方式 | 适合场景 | 优点 | 缺点 |
---|---|---|---|
关系数据库mysql | 基于幂等key排除重复场景资源抢占不激烈 | 数据库特有原子性和一致性,使用简单 | 基于事务,所以难以实现“阻塞等待”特性,锁竞争多时链接多不支持锁超时功能,需额外任务实现高并发下性能相对较差、竞争过多是会导致链接变多 |
K-V存储redis | 高并发资源强占场景可以支持防并发+防重 | 性能好,时延低适合高并发场景天然支持ttl解锁 | 集群模式下,master阶段宕机有都是数据风险(redlock一定程度商可解决)TTL机制强依赖系统时钟,在极端情况下 时钟漂移问题可能会导致问题 |
分布式一致性协调服务zookeeper | 持有锁场景较长的场景对锁安全性要求较高 | 天生为分布式协调而生,依据自有特性天然支持 | 读写性能较差**,在高并发场景下不适合** |
状态机约束
点击展开内容
int result = update();
if(result <= 0) {
return;//是重复的任务,可以忽略掉
}
影响方 | 方案 | 常见case | 概述 |
---|---|---|---|
上游 | 返回错误 | 前端重复提交同一时间创建多个相同类型的任务 | 同时执行会带来业务损失,且需要让上游感知 |
返回成功 | 异步消息上游重复发送eg:如果返回上游成功,则下游的任务状态也应该是成功,保持状态的一致性。 | 返回上游成功了,上游可能有重试机制,对于重试的又需要防重。防重逻辑中可能涉及对上一个任务状态的判断,若上一个任务状态上下游没有保持状态的一致性,则有可能导致防重失败。 | |
下游 | 放行下游 | 暂不了解场景 |
方案 | 简介 | 场景 |
---|---|---|
retryLog | 对于失败的任务插入DB 日志按照一定周期轮询失败任务,进行重试每隔5min重新跑一次定时任务 | 下游任务执行返回失败,上游自身,再重试需要明确哪些异常、状态才能重试 |
消息队列 | 异步解耦,并发高单个消费保证 at least once,也能保证任务一定被执行 | |
最大努力通知型事务 | 可靠的重试操作,保障业务数据的最终一致性支持TCC、柔性事务等 |
代码1
public String buildLockName(String orderId){
return distributedLockNamePrefix + "_" + orderId;
}
代码2
public static final int orderPayLockExpireTime = 60; //分布式锁过期时间,单位秒 public void payOrderId(String orderId){ String lockName = buildLockName(orderId); Lock lock = distributedLockManager.getReentrantLock(lockName, orderPayLockExpireTime); try{ // 尝试获取锁失败 if(!lock.tryLock()){ log.warn("尝试获取分布式锁失败,订单已经在处理中"); throw new Exception("订单已在处理中,请勿重复支付"); } // 修改数据库订单状态为"支付中" updateOrderToPaying(orderId); // 真正处理支付请求 doOrderPay(orderId); // 修改数据库订单状态为"支付成功" updateOrderToSuccess(orderId); }catch (Exception e){ // 异常处理.... handleException(e); }finally { lock.unlock(); } }
代码3
//更新数据库中的订单状态为支付中
public void updateOrderToPaying(String orderId) throws Exception {
// 根据订单id将订单更新为支付中
int row = orderRepo.updateOrderStatus(orderId, StatusEnum.ORDER_READY_TO_PAY.getCode(),StatusEnum.ORDER_PAYING.getCode();
if (row < 1) {
throw new Exception("订单状态错误,订单初始状态不是待支付");
}
}
代码4
//真正处理支付请求 public OrderPayResponse doOrderPay(String orderId){ try{ OrderPayRequest orderPayRequest = buildOrderPayRequest(orderId); OrderPayResponse orderPayResponse = payTService.pay(orderPayRequest); if(orderPayResponse.getCode != ResponseStatus.SUCCESS){ log.warn("请求支付平台返回错误,{}",orderPayResponse); throw new BusinessException(OrderPayResponseCodeEnum.FAIL.getCode(),"支付平台返回错误"); } return orderPayResponse; } catch (Exception e){ log.error("请求支付平台发生异常,orderId:{}",orderId,e); // 判断哪些异常是可以让业务发起重试 if(e instanceof ExceptionCanRetry){ // 可以直接让业务发起重试的异常 throw new BusinessException(OrderPayResponseCodeEnum.RETRY.getCode(),"支付平台返回错误,可重试"); } // 其余情况都不能直接发起重试,需要明确告诉业务 throw new BusinessException(OrderPayResponseCodeEnum.NO_RETRY.getCode(),"支付平台返回错误,不可重试"); } }
//修改订单状态为支付成功
public void updateOrderToSuccess(String orderId){
// 根据订单id将订单更新为支付中
int row = orderRepo.updateOrderStatus(orderId, StatusEnum.ORDER_PAYING.getCode(), StatusEnum.ORDER_PAY_SUCCESS.getCode();
if (row < 1) {
throw new Exception("更新订单成功错误,订单初始状态不是支付中");
}
}
代码6
` //处理异常
public void handleException(Exception e) throws Exception {
// 将订单状态变为失败
updateOrderToFail(orderId);
// 可以直接进行重试的异常
if (e instanceof ExceptionCanRetry) {
// write Into retryLog
// ..... throw e;
}
// 异常无法进行重试,报警处理,需要人工进行介入查看
alaramService.pushNotify();
throw e;
} `
步骤 | 说明 | 代码示例 |
---|---|---|
1.锁单据 | 组装分布式锁key订单id唯一键,一个订单在同一时间只能有一个线程在处理 | 代码1 |
尝试获取锁,统一使用分布式锁组件失败则直接报错(表明订单已经被其他线程处理中,支付中) | 代码2 | |
2.更改数据库状态 | 将数据库中订单状态由"待支付"变为"支付中" | 代码3 |
3.发送请求给下游支付平台 | 发送请求给支付平台明确规定哪些异常可以重试,哪些不能重试 | 代码4 |
4.修改订单状态为成功 | 数据库中订单状态由"支付中"变为"支付成功" | 代码5 |
5.Catch Exception异常处理 | 针对异常,需要分情况进行处理特定场景的异常可以由系统直接发起支付,将数据库中的单据改为失败状态,然后由重试retryLog拉起重新支付其余未知异常,默认系统无法自动处理,需要报警人工介入 | 代码6 |
6.主动释放锁 | finally中释放分布式锁确保锁被主动释放、确保即使有异常也不会影响锁的释放 | 见步骤1 |
为什么分布式锁获取失败需要直接丢弃,而不是阻塞等待
此处分布式锁的作用是为了防止单据重复,业务特性确认一个订单只能有一次请求,所以重复请求直接丢弃
为什么有了数据库状态的单向变更乐观锁,前面还需要缓存分布式锁去重操作?
减少极端情况下,大量重复请求对数据库的压力
给支付平台发送请求,为什么需要区分可重试和不可重试的两大类异常?
针对特定的错误类型,系统做到可重试,无需人工介入
2.针对不可重试错误类型,必须人工和下游确认后再次进行发起,避免造成金额损失
代码1
//给账户余额加上amount金额 public void addMountToAccount(String accountId,Long amount){ String lockName = buildLockName(accountId); Lock lock = distributedLockManager.getReentrantLock(lockName, accountLockExpireTime); try{ lock.lock(); // 获取账户原始余额 Long originAmount = getAccountAmount(accountId); // 更新后的余额 Long newAmount = originAmount + amount; // 更新账户余额 updateAccountAmount(accountId,newAmount,originAmount); }catch (Exception e){ // 异常处理.... handleException(e); }finally { lock.unlock(); } } `
代码2
/**
* 更新账户余额 *
* @param accountId 账户id
* @param originAmount 库中现有余额
* @param newAmount 更新后的余额
* @return */
public void updateAccountAmount(String accountId, Long originAmount, Long newAmount) {
// update account set amount = $newAmount where id = $accountId and amount = $originAmount;
int row = accountRepo.updateAmountByAccountId(accountId, originAmount, newAmount);
if (row < 1) {
throw new Exception("更新账户余额错误");
}
}
代码3
@Schedule("reprocess_task")//执行定时补偿任务 public void reProcessTask(){ Integer offset = 0; Integer limit = 10; while (true) { List<Task> taskList = reTryLogRepo.getUnProcessList(offset,limit); // 未找到待执行的任务,则退出 if(CollectionUtils.isEmpty(taskList)){ return; } for(Task task:taskList){ // 重试任务中的重试次数+1 task.addRetryTime(); boolean resultFlag = doReprocess(task); if(resultFlag){ // task置为成功 task.setTaskSuccess(); } // 更新DB中的task信息, //..... // 判断任务是否达到重试最大次数 if(task.reachMaxRetryTime()){ // 报警 } } offset += limit; } }
步骤 | 说明 | 代码示例 |
---|---|---|
1.锁单据 | 组装分布式Key账户id操作具有原子性, | public static final String distributedLockNamePrefix = "account_id_"; 、 |
失败则阻塞等待(账户操作串行执行) | 代码1 | public String buildLockName(Long accountId){ return distributedLockNamePrefix + “_” + accountId; } |
2.获取账户现有余额 | 根据账户id获取现有账户余额originAmount无需强制读主库 | public Long getAccountAmount(String accountId){ return accountRepo.getAmountByAccountId(accountId); } |
3.更新账户余额 | 根据账户id和现有余额,更新账户新的余额 | 代码2 |
4.Catch Exception异常处理 | 所有异常一律进入reTryLog | |
5.重试任务处理 | 定时任务处理重试任务 | 代码3 |
为什么未获取到锁需要阻塞等待
为什么不能直接用数据库余额累加。amount+=addAmount
例:账户余额有300元,需要给账户加100元。由于某种原因,重复请求给了DB。
明显方案1中会出现金额错误
为什么获取余额时,不用强制读主库,主从延迟会不会有影响?
不会有影响,update操作的where条件会强制读主库。当主从延迟存在时候,此时更新会报错
为什么所有异常一律进入reTryLog,不用区分不同异常来做重试
场景 | 事项 | 备注 |
---|---|---|
数据库插入数据 | [强制]有业务唯一键,数据库Unique key建立或幂等表 | |
[强制]线上禁止delete语句(包括业务逻辑和刷数) | ||
[强制]catch 唯一键重复异常 DuplicateKeyException | 备注不要cache SQLIntegrityConstraintViolationException 完整性约束异常,如果未给一个必填字段设值,也会抛这个异常。 | |
[建议]没有业务唯一键,根据实际情况先select再insert | ||
[建议]利用数据库select判重,防止主从延迟问题 | 备注对于强一致业务需求,建议强制读主库数据库有Unique Key兜底,select从库 | |
分布式锁使用 | [强制]分布式锁必须设置过期时间 | |
[建议]有业务唯一属性,插入&&更新操作前使用分布式锁进行并发判重 | public void process(){ try{ // 尝试获取锁 lock.tryLock(); //... }catch (Exception e){ log.error(""); return }finally{ lock.release(); } } | |
[建议]根据实际业务判断锁被占用的情况 | 情况一:阻塞等待:常见于业务需要串行执行的情况 (例如订单支付的消息还在处理中,订单完成的消息就来了) 情况二:丢弃:常见于相同消息重复发送 (例如订单支付的消息同时来了两条) | |
[建议]合理设置锁过期时间 | ||
[强制]手动释放锁资源 | public void process(){ try{ // 尝试获取锁 lock.tryLock(); //... }catch (Exception e){ log.error(""); return }finally{ lock.release(); } | |
[强制]确保释放锁是本线程加的锁,避免错误释放其他线程的锁 | ||
单库事务使用 | [强制]insert、update多表联动加事务注解 | @Transactional public void updateFoo(Foo foo) { // DB transaction1 // DB transaction2 } |
[强制]有状态单据,乐观锁更新 | //更新订单为成功状态 public void updateOrderToSuccess(String orderId){ // 将订单从ready状态更新为success // update order set status="success" where order_id=$orderId and status="ready" int affectedRows = orderDao.updateOrderFromReady(orderId); if(affectedRows < 1){ throw new Exception("update fail"); } } | |
[强制]事务注解内禁止调用RPC接口/MQ | ||
跨库事务 | [强制]分布式事务失败后必须有重试机制 | 概述系统自动重试,依赖reTryLog,最大努力通知,人工介入处理,高优报警 |
[建议]柔性事务保证最终一致性 | ||
重试机制 | [强制]重试机制达到最大次数后,需要人工介入 | |
[强制]重试任务和正常业务流程共用方法类 | ||
[建议]分时段阶梯重试 | ||
异常处理 | [强制]异常信息必须向上抛出或者记录,不可直接丢弃 | public void process(){ try{ doSomething(); }catch (Exception e){ throw BusinessException(); log.error("something is wrong"); return; } } |
[强制]涉及金额打款等核心逻辑异常,报警必须人工感知 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。