当前位置:   article > 正文

tcc分布式事务框架源码解析系列(四)之项目实战

切面 confirmmethod
通过之前的几篇文章我相信您已经搭建好了运行环境,本次的项目实战是依照happylifeplat-tcc-demo项目来演练,也是非常经典的分布式事务场景:支付成功,进行订单状态的更新,扣除用户账户,库存扣减这几个模块来进行tcc分布式事务。话不多说,让我们一起进入体验吧!
  • 首先我们找到 PaymentServiceImpl.makePayment 方法,这是tcc分布式事务的发起者
  1. //tcc分布式事务的发起者
  2. @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
  3. public void makePayment(Order order) {
  4. order.setStatus(OrderStatusEnum.PAYING.getCode());
  5. orderMapper.update(order);
  6. //扣除用户余额 远端的rpc调用
  7. AccountDTO accountDTO = new AccountDTO();
  8. accountDTO.setAmount(order.getTotalAmount());
  9. accountDTO.setUserId(order.getUserId());
  10. accountService.payment(accountDTO);
  11. //进入扣减库存操作 远端的rpc调用
  12. InventoryDTO inventoryDTO = new InventoryDTO();
  13. inventoryDTO.setCount(order.getCount());
  14. inventoryDTO.setProductId(order.getProductId());
  15. inventoryService.decrease(inventoryDTO);
  16. }
  17. //更新订单的confirm方法
  18. public void confirmOrderStatus(Order order) {
  19. order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());
  20. orderMapper.update(order);
  21. LOGGER.info("=========进行订单confirm操作完成================");
  22. }
  23. //更新订单的cancel方法
  24. public void cancelOrderStatus(Order order) {
  25. order.setStatus(OrderStatusEnum.PAY_FAIL.getCode());
  26. orderMapper.update(order);
  27. LOGGER.info("=========进行订单cancel操作完成================");
  28. }复制代码
  • makePayment 方法是tcc分布式事务的发起者,它里面有更新订单状态(本地),进行库存扣减(rpc扣减),资金扣减(rpc调用)

  • confirmOrderStatus 为订单状态confirm方法,cancelOrderStatus 为订单状态cancel方法。

  • 我们重点关注 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus") 这个注解,为什么加了这个注解就有神奇的作用。

@Tcc注解切面

  1. @Aspect
  2. public abstract class TccTransactionAspect {
  3. private TccTransactionInterceptor tccTransactionInterceptor;
  4. public void setTccTransactionInterceptor(TccTransactionInterceptor tccTransactionInterceptor) {
  5. this.tccTransactionInterceptor = tccTransactionInterceptor;
  6. }
  7. @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
  8. public void txTransactionInterceptor() {
  9. }
  10. @Around("txTransactionInterceptor()")
  11. public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
  12. return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
  13. }
  14. public abstract int getOrder();
  15. }复制代码
  • 我们可以知道Spring实现类的方法凡是加了@Tcc注解的,在调用的时候,都会进行 tccTransactionInterceptor.interceptor 调用。

  • 其次我们发现该类是一个抽象类,肯定会有其他类继承它,你猜的没错,对应dubbo用户,他的继承类为:

  1. @Aspect
  2. @Component
  3. public class DubboTccTransactionAspect extends TccTransactionAspect implements Ordered {
  4. @Autowired
  5. public DubboTccTransactionAspect(DubboTccTransactionInterceptor dubboTccTransactionInterceptor) {
  6. super.setTccTransactionInterceptor(dubboTccTransactionInterceptor);
  7. }
  8. @Override
  9. public int getOrder() {
  10. return Ordered.HIGHEST_PRECEDENCE;
  11. }
  12. }复制代码
  • springcloud的用户,它的继承类为:
  1. @Aspect
  2. @Component
  3. public class SpringCloudTxTransactionAspect extends TccTransactionAspect implements Ordered {
  4. @Autowired
  5. public SpringCloudTxTransactionAspect(SpringCloudTxTransactionInterceptor springCloudTxTransactionInterceptor) {
  6. this.setTccTransactionInterceptor(springCloudTxTransactionInterceptor);
  7. }
  8. public void init() {
  9. }
  10. @Override
  11. public int getOrder() {
  12. return Ordered.HIGHEST_PRECEDENCE;
  13. }
  14. }复制代码
  • 我们注意到他们都实现了Spring的Ordered接口,并重写了 getOrder 方法,都返回了 Ordered.HIGHEST_PRECEDENCE 那么可以知道,他是优先级最高的切面。

TccCoordinatorMethodAspect 切面

  1. @Aspect
  2. @Component
  3. public class TccCoordinatorMethodAspect implements Ordered {
  4. private final TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor;
  5. @Autowired
  6. public TccCoordinatorMethodAspect(TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor) {
  7. this.tccCoordinatorMethodInterceptor = tccCoordinatorMethodInterceptor;
  8. }
  9. @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
  10. public void coordinatorMethod() {
  11. }
  12. @Around("coordinatorMethod()")
  13. public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
  14. return tccCoordinatorMethodInterceptor.interceptor(proceedingJoinPoint);
  15. }
  16. @Override
  17. public int getOrder() {
  18. return Ordered.HIGHEST_PRECEDENCE + 1;
  19. }
  20. }复制代码
  • 该切面是第二优先级的切面,意思就是当我们对有@Tcc注解的方法发起调用的时候,首先会进入 TccTransactionAspect 切面,然后再进入 TccCoordinatorMethodAspect 切面。
    该切面主要是用来获取@Tcc注解上的元数据,比如confrim方法名称等等。

  • 到这里如果大家基本能明白的话,差不多就了解了整个框架原理,现在让我们来跟踪调用吧。

现在我们回过头,我们来调用 PaymentServiceImpl.makePayment 方法

  • dubbo用户,我们会进入如下拦截器:
  1. @Component
  2. public class DubboTccTransactionInterceptor implements TccTransactionInterceptor {
  3. private final TccTransactionAspectService tccTransactionAspectService;
  4. @Autowired
  5. public DubboTccTransactionInterceptor(TccTransactionAspectService tccTransactionAspectService) {
  6. this.tccTransactionAspectService = tccTransactionAspectService;
  7. }
  8. @Override
  9. public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
  10. final String context = RpcContext.getContext().getAttachment(Constant.TCC_TRANSACTION_CONTEXT);
  11. TccTransactionContext tccTransactionContext = null;
  12. if (StringUtils.isNoneBlank(context)) {
  13. tccTransactionContext =
  14. GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
  15. }
  16. return tccTransactionAspectService.invoke(tccTransactionContext, pjp);
  17. }
  18. }复制代码
  • 我们继续跟踪 tccTransactionAspectService.invoke(tccTransactionContext, pjp) ,发现通过判断过后,我们会进入 StartTccTransactionHandler.handler 方法,我们已经进入了分布式事务处理的入口了!
  1. /**
  2. * 分布式事务处理接口
  3. *
  4. * @param point point 切点
  5. * @param context 信息
  6. * @return Object
  7. * @throws Throwable 异常
  8. */
  9. @Override
  10. public Object handler(ProceedingJoinPoint point, TccTransactionContext context) throws Throwable {
  11. Object returnValue;
  12. try {
  13. //开启分布式事务
  14. tccTransactionManager.begin();
  15. try {
  16. //发起调用 执行try方法
  17. returnValue = point.proceed();
  18. } catch (Throwable throwable) {
  19. //异常执行cancel
  20. tccTransactionManager.cancel();
  21. throw throwable;
  22. }
  23. //try成功执行confirm confirm 失败的话,那就只能走本地补偿
  24. tccTransactionManager.confirm();
  25. } finally {
  26. tccTransactionManager.remove();
  27. }
  28. return returnValue;
  29. }复制代码
  • 我们进行跟进 tccTransactionManager.begin() 方法:
  1. /**
  2. * 该方法为发起方第一次调用
  3. * 也是tcc事务的入口
  4. */
  5. void begin() {
  6. LogUtil.debug(LOGGER, () -> "开始执行tcc事务!start");
  7. TccTransaction tccTransaction = CURRENT.get();
  8. if (Objects.isNull(tccTransaction)) {
  9. tccTransaction = new TccTransaction();
  10. tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
  11. tccTransaction.setRole(TccRoleEnum.START.getCode());
  12. }
  13. //保存当前事务信息
  14. coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, tccTransaction));
  15. CURRENT.set(tccTransaction);
  16. //设置tcc事务上下文,这个类会传递给远端
  17. TccTransactionContext context = new TccTransactionContext();
  18. context.setAction(TccActionEnum.TRYING.getCode());//设置执行动作为try
  19. context.setTransId(tccTransaction.getTransId());//设置事务id
  20. TransactionContextLocal.getInstance().set(context);
  21. }复制代码
  • 这里我们保存了事务信息,并且开启了事务上下文,并把它保存在了ThreadLoacl里面,大家想想这里为什么一定要保存在ThreadLocal里面。

  • begin方法执行完后,我们回到切面,现在我们来执行 point.proceed(),当执行这一句代码的时候,会进入第二个切面,即进入了 TccCoordinatorMethodInterceptor

  1. public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
  2. final TccTransaction currentTransaction = tccTransactionManager.getCurrentTransaction();
  3. if (Objects.nonNull(currentTransaction)) {
  4. final TccActionEnum action = TccActionEnum.acquireByCode(currentTransaction.getStatus());
  5. switch (action) {
  6. case TRYING:
  7. registerParticipant(pjp, currentTransaction.getTransId());
  8. break;
  9. case CONFIRMING:
  10. break;
  11. case CANCELING:
  12. break;
  13. }
  14. }
  15. return pjp.proceed(pjp.getArgs());
  16. }复制代码
  • 这里由于是在try阶段,直接就进入了 registerParticipant 方法
  1. private void registerParticipant(ProceedingJoinPoint point, String transId) throws NoSuchMethodException {
  2. MethodSignature signature = (MethodSignature) point.getSignature();
  3. Method method = signature.getMethod();
  4. Class<?> clazz = point.getTarget().getClass();
  5. Object[] args = point.getArgs();
  6. final Tcc tcc = method.getAnnotation(Tcc.class);
  7. //获取协调方法
  8. String confirmMethodName = tcc.confirmMethod();
  9. /* if (StringUtils.isBlank(confirmMethodName)) {
  10. confirmMethodName = method.getName();
  11. }*/
  12. String cancelMethodName = tcc.cancelMethod();
  13. /* if (StringUtils.isBlank(cancelMethodName)) {
  14. cancelMethodName = method.getName();
  15. }
  16. */
  17. //设置模式
  18. final TccPatternEnum pattern = tcc.pattern();
  19. tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
  20. TccInvocation confirmInvocation = null;
  21. if (StringUtils.isNoneBlank(confirmMethodName)) {
  22. confirmInvocation = new TccInvocation(clazz,
  23. confirmMethodName, method.getParameterTypes(), args);
  24. }
  25. TccInvocation cancelInvocation = null;
  26. if (StringUtils.isNoneBlank(cancelMethodName)) {
  27. cancelInvocation = new TccInvocation(clazz,
  28. cancelMethodName,
  29. method.getParameterTypes(), args);
  30. }
  31. //封装调用点
  32. final Participant participant = new Participant(
  33. transId,
  34. confirmInvocation,
  35. cancelInvocation);
  36. tccTransactionManager.enlistParticipant(participant);
  37. }复制代码
  • 这里就获取了 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
    的信息,并把他封装成了 Participant,存起来,然后真正的调用 PaymentServiceImpl.makePayment 业务方法,在业务方法里面,我们首先执行的是更新订单状态(相信现在你还记得0.0):
  1. order.setStatus(OrderStatusEnum.PAYING.getCode());
  2. orderMapper.update(order);复制代码
  • 接下来,我们进行扣除用户余额,注意扣除余额这里是一个rpc方法:
  1. AccountDTO accountDTO = new AccountDTO();
  2. accountDTO.setAmount(order.getTotalAmount());
  3. accountDTO.setUserId(order.getUserId());
  4. accountService.payment(accountDTO)复制代码
  • 现在我们来关注 accountService.payment(accountDTO) ,这个接口的定义。
dubbo接口:
  1. public interface AccountService {
  2. /**
  3. * 扣款支付
  4. *
  5. * @param accountDTO 参数dto
  6. * @return true
  7. */
  8. @Tcc
  9. boolean payment(AccountDTO accountDTO);
  10. }复制代码
springcloud接口
  1. @FeignClient(value = "account-service", configuration = MyConfiguration.class)
  2. public interface AccountClient {
  3. @PostMapping("/account-service/account/payment")
  4. @Tcc
  5. Boolean payment(@RequestBody AccountDTO accountDO);
  6. }复制代码

很明显这里我们都在接口上加了@Tcc注解,我们知道springAop的特性,在接口上加注解,是无法进入切面的,所以我们在这里,要采用rpc框架的某些特性来帮助我们获取到 @Tcc注解信息。 这一步很重要。当我们发起

accountService.payment(accountDTO) 调用的时候:

  • dubbo用户,会走dubbo的filter接口,TccTransactionFilter:
  1. private void registerParticipant(Class clazz, String methodName, Object[] arguments, Class... args) throws TccRuntimeException {
  2. try {
  3. Method method = clazz.getDeclaredMethod(methodName, args);
  4. Tcc tcc = method.getAnnotation(Tcc.class);
  5. if (Objects.nonNull(tcc)) {
  6. //获取事务的上下文
  7. final TccTransactionContext tccTransactionContext =
  8. TransactionContextLocal.getInstance().get();
  9. if (Objects.nonNull(tccTransactionContext)) {
  10. //dubbo rpc传参数
  11. RpcContext.getContext()
  12. .setAttachment(Constant.TCC_TRANSACTION_CONTEXT,
  13. GsonUtils.getInstance().toJson(tccTransactionContext));
  14. }
  15. if (Objects.nonNull(tccTransactionContext)) {
  16. if(TccActionEnum.TRYING.getCode()==tccTransactionContext.getAction()){
  17. //获取协调方法
  18. String confirmMethodName = tcc.confirmMethod();
  19. if (StringUtils.isBlank(confirmMethodName)) {
  20. confirmMethodName = method.getName();
  21. }
  22. String cancelMethodName = tcc.cancelMethod();
  23. if (StringUtils.isBlank(cancelMethodName)) {
  24. cancelMethodName = method.getName();
  25. }
  26. //设置模式
  27. final TccPatternEnum pattern = tcc.pattern();
  28. tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
  29. TccInvocation confirmInvocation = new TccInvocation(clazz,
  30. confirmMethodName,
  31. args, arguments);
  32. TccInvocation cancelInvocation = new TccInvocation(clazz,
  33. cancelMethodName,
  34. args, arguments);
  35. //封装调用点
  36. final Participant participant = new Participant(
  37. tccTransactionContext.getTransId(),
  38. confirmInvocation,
  39. cancelInvocation);
  40. tccTransactionManager.enlistParticipant(participant);
  41. }
  42. }
  43. }
  44. } catch (NoSuchMethodException e) {
  45. throw new TccRuntimeException("not fount method " + e.getMessage());
  46. }
  47. }复制代码
  • springcloud 用户,则会进入 TccFeignHandler
  1. public class TccFeignHandler implements InvocationHandler {
  2. /**
  3. * logger
  4. */
  5. private static final Logger LOGGER = LoggerFactory.getLogger(TccFeignHandler.class);
  6. private Target<?> target;
  7. private Map<Method, MethodHandler> handlers;
  8. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  9. if (Object.class.equals(method.getDeclaringClass())) {
  10. return method.invoke(this, args);
  11. } else {
  12. final Tcc tcc = method.getAnnotation(Tcc.class);
  13. if (Objects.isNull(tcc)) {
  14. return this.handlers.get(method).invoke(args);
  15. }
  16. final TccTransactionContext tccTransactionContext =
  17. TransactionContextLocal.getInstance().get();
  18. if (Objects.nonNull(tccTransactionContext)) {
  19. final TccTransactionManager tccTransactionManager =
  20. SpringBeanUtils.getInstance().getBean(TccTransactionManager.class);
  21. if (TccActionEnum.TRYING.getCode() == tccTransactionContext.getAction()) {
  22. //获取协调方法
  23. String confirmMethodName = tcc.confirmMethod();
  24. if (StringUtils.isBlank(confirmMethodName)) {
  25. confirmMethodName = method.getName();
  26. }
  27. String cancelMethodName = tcc.cancelMethod();
  28. if (StringUtils.isBlank(cancelMethodName)) {
  29. cancelMethodName = method.getName();
  30. }
  31. //设置模式
  32. final TccPatternEnum pattern = tcc.pattern();
  33. tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
  34. final Class<?> declaringClass = method.getDeclaringClass();
  35. TccInvocation confirmInvocation = new TccInvocation(declaringClass,
  36. confirmMethodName,
  37. method.getParameterTypes(), args);
  38. TccInvocation cancelInvocation = new TccInvocation(declaringClass,
  39. cancelMethodName,
  40. method.getParameterTypes(), args);
  41. //封装调用点
  42. final Participant participant = new Participant(
  43. tccTransactionContext.getTransId(),
  44. confirmInvocation,
  45. cancelInvocation);
  46. tccTransactionManager.enlistParticipant(participant);
  47. }
  48. }
  49. return this.handlers.get(method).invoke(args);
  50. }
  51. }
  52. public void setTarget(Target<?> target) {
  53. this.target = target;
  54. }
  55. public void setHandlers(Map<Method, MethodHandler> handlers) {
  56. this.handlers = handlers;
  57. }
  58. }复制代码

重要提示 在这里我们获取了在第一步设置的事务上下文,然后进行dubbo的rpc传参数,细心的你可能会发现,这里获取远端confirm,cancel方法其实就是自己本身啊?,那发起者还怎么来调用远端的confrim,cancel方法呢?这里的调用,我们通过事务上下文的状态来控制,比如在confrim阶段,我们就调用confrim方法等。。

到这里我们就完成了整个消费端的调用,下一篇分析提供者的调用流程!

转载于:https://juejin.im/post/59e080126fb9a0450f20f408

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号