通过之前的几篇文章我相信您已经搭建好了运行环境,本次的项目实战是依照happylifeplat-tcc-demo项目来演练,也是非常经典的分布式事务场景:支付成功,进行订单状态的更新,扣除用户账户,库存扣减这几个模块来进行tcc分布式事务。话不多说,让我们一起进入体验吧!
- 首先我们找到 PaymentServiceImpl.makePayment 方法,这是tcc分布式事务的发起者
- //tcc分布式事务的发起者
- @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
- public void makePayment(Order order) {
- order.setStatus(OrderStatusEnum.PAYING.getCode());
- orderMapper.update(order);
- //扣除用户余额 远端的rpc调用
- AccountDTO accountDTO = new AccountDTO();
- accountDTO.setAmount(order.getTotalAmount());
- accountDTO.setUserId(order.getUserId());
- accountService.payment(accountDTO);
- //进入扣减库存操作 远端的rpc调用
- InventoryDTO inventoryDTO = new InventoryDTO();
- inventoryDTO.setCount(order.getCount());
- inventoryDTO.setProductId(order.getProductId());
- inventoryService.decrease(inventoryDTO);
- }
- //更新订单的confirm方法
- public void confirmOrderStatus(Order order) {
- order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());
- orderMapper.update(order);
- LOGGER.info("=========进行订单confirm操作完成================");
-
-
- }
- //更新订单的cancel方法
- public void cancelOrderStatus(Order order) {
- order.setStatus(OrderStatusEnum.PAY_FAIL.getCode());
- orderMapper.update(order);
- LOGGER.info("=========进行订单cancel操作完成================");
- }复制代码
makePayment 方法是tcc分布式事务的发起者,它里面有更新订单状态(本地),进行库存扣减(rpc扣减),资金扣减(rpc调用)
confirmOrderStatus 为订单状态confirm方法,cancelOrderStatus 为订单状态cancel方法。
我们重点关注 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus") 这个注解,为什么加了这个注解就有神奇的作用。
@Tcc注解切面
- 我们找到在happylifeplat-tcc-core包中找到
TccTransactionAspect,这里定义@Tcc的切点。
- @Aspect
- public abstract class TccTransactionAspect {
-
- private TccTransactionInterceptor tccTransactionInterceptor;
-
- public void setTccTransactionInterceptor(TccTransactionInterceptor tccTransactionInterceptor) {
- this.tccTransactionInterceptor = tccTransactionInterceptor;
- }
-
- @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
- public void txTransactionInterceptor() {
-
- }
-
- @Around("txTransactionInterceptor()")
- public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
- return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
- }
-
- public abstract int getOrder();
- }复制代码
我们可以知道Spring实现类的方法凡是加了@Tcc注解的,在调用的时候,都会进行 tccTransactionInterceptor.interceptor 调用。
其次我们发现该类是一个抽象类,肯定会有其他类继承它,你猜的没错,对应dubbo用户,他的继承类为:
- @Aspect
- @Component
- public class DubboTccTransactionAspect extends TccTransactionAspect implements Ordered {
-
-
- @Autowired
- public DubboTccTransactionAspect(DubboTccTransactionInterceptor dubboTccTransactionInterceptor) {
- super.setTccTransactionInterceptor(dubboTccTransactionInterceptor);
- }
-
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE;
- }
- }复制代码
- springcloud的用户,它的继承类为:
- @Aspect
- @Component
- public class SpringCloudTxTransactionAspect extends TccTransactionAspect implements Ordered {
-
-
- @Autowired
- public SpringCloudTxTransactionAspect(SpringCloudTxTransactionInterceptor springCloudTxTransactionInterceptor) {
- this.setTccTransactionInterceptor(springCloudTxTransactionInterceptor);
- }
-
-
- public void init() {
-
- }
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE;
- }
- }复制代码
- 我们注意到他们都实现了Spring的Ordered接口,并重写了 getOrder 方法,都返回了 Ordered.HIGHEST_PRECEDENCE 那么可以知道,他是优先级最高的切面。
TccCoordinatorMethodAspect 切面
- @Aspect
- @Component
- public class TccCoordinatorMethodAspect implements Ordered {
-
-
- private final TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor;
-
- @Autowired
- public TccCoordinatorMethodAspect(TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor) {
- this.tccCoordinatorMethodInterceptor = tccCoordinatorMethodInterceptor;
- }
-
-
- @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
- public void coordinatorMethod() {
-
- }
-
- @Around("coordinatorMethod()")
- public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
- return tccCoordinatorMethodInterceptor.interceptor(proceedingJoinPoint);
- }
-
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE + 1;
- }
- }复制代码
该切面是第二优先级的切面,意思就是当我们对有@Tcc注解的方法发起调用的时候,首先会进入 TccTransactionAspect 切面,然后再进入 TccCoordinatorMethodAspect 切面。
该切面主要是用来获取@Tcc注解上的元数据,比如confrim方法名称等等。到这里如果大家基本能明白的话,差不多就了解了整个框架原理,现在让我们来跟踪调用吧。
现在我们回过头,我们来调用 PaymentServiceImpl.makePayment 方法
- dubbo用户,我们会进入如下拦截器:
-
- @Component
- public class DubboTccTransactionInterceptor implements TccTransactionInterceptor {
-
- private final TccTransactionAspectService tccTransactionAspectService;
-
- @Autowired
- public DubboTccTransactionInterceptor(TccTransactionAspectService tccTransactionAspectService) {
- this.tccTransactionAspectService = tccTransactionAspectService;
- }
-
-
- @Override
- public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
- final String context = RpcContext.getContext().getAttachment(Constant.TCC_TRANSACTION_CONTEXT);
- TccTransactionContext tccTransactionContext = null;
- if (StringUtils.isNoneBlank(context)) {
- tccTransactionContext =
- GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
- }
- return tccTransactionAspectService.invoke(tccTransactionContext, pjp);
- }
- }复制代码
- 我们继续跟踪 tccTransactionAspectService.invoke(tccTransactionContext, pjp) ,发现通过判断过后,我们会进入 StartTccTransactionHandler.handler 方法,我们已经进入了分布式事务处理的入口了!
- /**
- * 分布式事务处理接口
- *
- * @param point point 切点
- * @param context 信息
- * @return Object
- * @throws Throwable 异常
- */
- @Override
- public Object handler(ProceedingJoinPoint point, TccTransactionContext context) throws Throwable {
- Object returnValue;
- try {
- //开启分布式事务
- tccTransactionManager.begin();
- try {
- //发起调用 执行try方法
- returnValue = point.proceed();
-
- } catch (Throwable throwable) {
- //异常执行cancel
-
- tccTransactionManager.cancel();
-
- throw throwable;
- }
- //try成功执行confirm confirm 失败的话,那就只能走本地补偿
- tccTransactionManager.confirm();
- } finally {
- tccTransactionManager.remove();
- }
- return returnValue;
- }复制代码
- 我们进行跟进 tccTransactionManager.begin() 方法:
- /**
- * 该方法为发起方第一次调用
- * 也是tcc事务的入口
- */
- void begin() {
- LogUtil.debug(LOGGER, () -> "开始执行tcc事务!start");
- TccTransaction tccTransaction = CURRENT.get();
- if (Objects.isNull(tccTransaction)) {
- tccTransaction = new TccTransaction();
- tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
- tccTransaction.setRole(TccRoleEnum.START.getCode());
- }
- //保存当前事务信息
- coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, tccTransaction));
-
- CURRENT.set(tccTransaction);
-
- //设置tcc事务上下文,这个类会传递给远端
- TccTransactionContext context = new TccTransactionContext();
- context.setAction(TccActionEnum.TRYING.getCode());//设置执行动作为try
- context.setTransId(tccTransaction.getTransId());//设置事务id
- TransactionContextLocal.getInstance().set(context);
-
- }复制代码
这里我们保存了事务信息,并且开启了事务上下文,并把它保存在了ThreadLoacl里面,大家想想这里为什么一定要保存在ThreadLocal里面。
begin方法执行完后,我们回到切面,现在我们来执行 point.proceed(),当执行这一句代码的时候,会进入第二个切面,即进入了 TccCoordinatorMethodInterceptor
-
- public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
-
- final TccTransaction currentTransaction = tccTransactionManager.getCurrentTransaction();
-
- if (Objects.nonNull(currentTransaction)) {
- final TccActionEnum action = TccActionEnum.acquireByCode(currentTransaction.getStatus());
- switch (action) {
- case TRYING:
- registerParticipant(pjp, currentTransaction.getTransId());
- break;
- case CONFIRMING:
- break;
- case CANCELING:
- break;
- }
- }
- return pjp.proceed(pjp.getArgs());
- }复制代码
- 这里由于是在try阶段,直接就进入了 registerParticipant 方法
- private void registerParticipant(ProceedingJoinPoint point, String transId) throws NoSuchMethodException {
-
- MethodSignature signature = (MethodSignature) point.getSignature();
- Method method = signature.getMethod();
-
- Class<?> clazz = point.getTarget().getClass();
-
- Object[] args = point.getArgs();
-
- final Tcc tcc = method.getAnnotation(Tcc.class);
-
- //获取协调方法
- String confirmMethodName = tcc.confirmMethod();
-
- /* if (StringUtils.isBlank(confirmMethodName)) {
- confirmMethodName = method.getName();
- }*/
-
- String cancelMethodName = tcc.cancelMethod();
-
- /* if (StringUtils.isBlank(cancelMethodName)) {
- cancelMethodName = method.getName();
- }
- */
- //设置模式
- final TccPatternEnum pattern = tcc.pattern();
-
- tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
-
-
- TccInvocation confirmInvocation = null;
- if (StringUtils.isNoneBlank(confirmMethodName)) {
- confirmInvocation = new TccInvocation(clazz,
- confirmMethodName, method.getParameterTypes(), args);
- }
-
- TccInvocation cancelInvocation = null;
- if (StringUtils.isNoneBlank(cancelMethodName)) {
- cancelInvocation = new TccInvocation(clazz,
- cancelMethodName,
- method.getParameterTypes(), args);
- }
-
-
- //封装调用点
- final Participant participant = new Participant(
- transId,
- confirmInvocation,
- cancelInvocation);
-
- tccTransactionManager.enlistParticipant(participant);
-
- }复制代码
- 这里就获取了 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
的信息,并把他封装成了 Participant,存起来,然后真正的调用 PaymentServiceImpl.makePayment 业务方法,在业务方法里面,我们首先执行的是更新订单状态(相信现在你还记得0.0):
- order.setStatus(OrderStatusEnum.PAYING.getCode());
- orderMapper.update(order);复制代码
- 接下来,我们进行扣除用户余额,注意扣除余额这里是一个rpc方法:
- AccountDTO accountDTO = new AccountDTO();
- accountDTO.setAmount(order.getTotalAmount());
- accountDTO.setUserId(order.getUserId());
- accountService.payment(accountDTO)复制代码
- 现在我们来关注 accountService.payment(accountDTO) ,这个接口的定义。
dubbo接口:
-
- public interface AccountService {
-
-
- /**
- * 扣款支付
- *
- * @param accountDTO 参数dto
- * @return true
- */
- @Tcc
- boolean payment(AccountDTO accountDTO);
- }复制代码
springcloud接口
- @FeignClient(value = "account-service", configuration = MyConfiguration.class)
- public interface AccountClient {
-
- @PostMapping("/account-service/account/payment")
- @Tcc
- Boolean payment(@RequestBody AccountDTO accountDO);
-
- }复制代码
很明显这里我们都在接口上加了@Tcc注解,我们知道springAop的特性,在接口上加注解,是无法进入切面的,所以我们在这里,要采用rpc框架的某些特性来帮助我们获取到 @Tcc注解信息。 这一步很重要。当我们发起
accountService.payment(accountDTO) 调用的时候:
- dubbo用户,会走dubbo的filter接口,TccTransactionFilter:
- private void registerParticipant(Class clazz, String methodName, Object[] arguments, Class... args) throws TccRuntimeException {
- try {
- Method method = clazz.getDeclaredMethod(methodName, args);
- Tcc tcc = method.getAnnotation(Tcc.class);
- if (Objects.nonNull(tcc)) {
-
- //获取事务的上下文
- final TccTransactionContext tccTransactionContext =
- TransactionContextLocal.getInstance().get();
- if (Objects.nonNull(tccTransactionContext)) {
- //dubbo rpc传参数
- RpcContext.getContext()
- .setAttachment(Constant.TCC_TRANSACTION_CONTEXT,
- GsonUtils.getInstance().toJson(tccTransactionContext));
- }
- if (Objects.nonNull(tccTransactionContext)) {
- if(TccActionEnum.TRYING.getCode()==tccTransactionContext.getAction()){
- //获取协调方法
- String confirmMethodName = tcc.confirmMethod();
-
- if (StringUtils.isBlank(confirmMethodName)) {
- confirmMethodName = method.getName();
- }
-
- String cancelMethodName = tcc.cancelMethod();
-
- if (StringUtils.isBlank(cancelMethodName)) {
- cancelMethodName = method.getName();
- }
- //设置模式
- final TccPatternEnum pattern = tcc.pattern();
-
- tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
- TccInvocation confirmInvocation = new TccInvocation(clazz,
- confirmMethodName,
- args, arguments);
-
- TccInvocation cancelInvocation = new TccInvocation(clazz,
- cancelMethodName,
- args, arguments);
-
- //封装调用点
- final Participant participant = new Participant(
- tccTransactionContext.getTransId(),
- confirmInvocation,
- cancelInvocation);
-
- tccTransactionManager.enlistParticipant(participant);
- }
-
- }
-
-
-
- }
- } catch (NoSuchMethodException e) {
- throw new TccRuntimeException("not fount method " + e.getMessage());
- }
-
- }复制代码
- springcloud 用户,则会进入 TccFeignHandler :
- public class TccFeignHandler implements InvocationHandler {
- /**
- * logger
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(TccFeignHandler.class);
-
- private Target<?> target;
- private Map<Method, MethodHandler> handlers;
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- if (Object.class.equals(method.getDeclaringClass())) {
- return method.invoke(this, args);
- } else {
-
- final Tcc tcc = method.getAnnotation(Tcc.class);
- if (Objects.isNull(tcc)) {
- return this.handlers.get(method).invoke(args);
- }
-
- final TccTransactionContext tccTransactionContext =
- TransactionContextLocal.getInstance().get();
- if (Objects.nonNull(tccTransactionContext)) {
- final TccTransactionManager tccTransactionManager =
- SpringBeanUtils.getInstance().getBean(TccTransactionManager.class);
- if (TccActionEnum.TRYING.getCode() == tccTransactionContext.getAction()) {
- //获取协调方法
- String confirmMethodName = tcc.confirmMethod();
-
- if (StringUtils.isBlank(confirmMethodName)) {
- confirmMethodName = method.getName();
- }
-
- String cancelMethodName = tcc.cancelMethod();
-
- if (StringUtils.isBlank(cancelMethodName)) {
- cancelMethodName = method.getName();
- }
-
- //设置模式
- final TccPatternEnum pattern = tcc.pattern();
-
- tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
-
- final Class<?> declaringClass = method.getDeclaringClass();
-
- TccInvocation confirmInvocation = new TccInvocation(declaringClass,
- confirmMethodName,
- method.getParameterTypes(), args);
-
- TccInvocation cancelInvocation = new TccInvocation(declaringClass,
- cancelMethodName,
- method.getParameterTypes(), args);
-
- //封装调用点
- final Participant participant = new Participant(
- tccTransactionContext.getTransId(),
- confirmInvocation,
- cancelInvocation);
-
- tccTransactionManager.enlistParticipant(participant);
- }
-
- }
-
-
- return this.handlers.get(method).invoke(args);
- }
- }
-
-
- public void setTarget(Target<?> target) {
- this.target = target;
- }
-
-
- public void setHandlers(Map<Method, MethodHandler> handlers) {
- this.handlers = handlers;
- }
-
- }复制代码