当前位置:   article > 正文

分布式事务Hmily TCC源码跟读记录_依赖包 hmilyresttemplateinterceptor()

依赖包 hmilyresttemplateinterceptor()

这里就不罗列TCC事务的优点了,奉上该项目的地址https://github.com/yu199195/hmily

一、什么是分布式事务

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点上,

本质上来说,分布式事务是为了保证不同数据库的数据一致性

TCC事务主要是基于AOP切面拦截实现的三阶段提交事务,下面我们来跟读源码

二、CAP理论

解决多个微服务之间数据可能不一致的问题,一般有个CAP理论:

  • C: 一致性.在分布式系统中的所有数据备份,在同一时刻具有同样的值,所有节点在同一时刻读取的数据都是最新的数据副本
  • A: 可用性,好的响应性能.完全的可用性指的是在任何故障模型下,服务都会有限的时间内处理完成并进行响应
  • P:分区容忍性.尽管网络有部分消息丢失,但系统仍然可以继续工作

CAP原理证明,任何分布式系统只可同时满足以上两点,无法三者兼顾.由于关系型数据库是单节点无复制的,因此不具有分区容忍性,但具有一致性和可用性,而分布式的微服务系统都必须满足分区容忍性,SpringCloud中的Eureka就是A P 定理的结合

三、Hmily框架源码跟读

项目结构:

1.hmily-tcc-admin:用来监控事务日志

2.hmily-tcc-annotation:提供tcc aop注解

3.hmily-tcc-common:提供一些工具类和配置类

4.hmily-tcc-core:分布式事务执行的核心部分

这个框架主要模拟一个场景,下单---->新建订单---->扣款----->减库存的操作,涉及到3个库

tcc分布式事务主要是基于AOP切面原理进行注解拦截三阶段提交(try-confirm-cancel)的分布式事务,下面我们进行源码跟读:

  • 注解拦截器

HmilyTransactionBootstrap启动引导类

  1. /**
  2. * tcc分布式事务框架注解.
  3. * @author xiaoyu
  4. */
  5. @Retention(RetentionPolicy.RUNTIME)
  6. @Target({ElementType.METHOD})
  7. public @interface Tcc {
  8. /**
  9. * spring事务传播.
  10. * @return {@linkplain PropagationEnum}
  11. */
  12. PropagationEnum propagation() default PropagationEnum.PROPAGATION_REQUIRED;
  13. /**
  14. * tcc框架确认方法 tcc中第一个c.
  15. *
  16. * @return confirm方法名称
  17. */
  18. String confirmMethod() default "";
  19. /**
  20. * tcc框架确认方法 tcc中第二个c.
  21. *
  22. * @return cancel方法名称
  23. */
  24. String cancelMethod() default "";
  25. /**
  26. * 模式 tcc 和cc模式.
  27. * tcc模式代表try中有数据库操作,try需要回滚.
  28. * cc模式代表try中无数据库操作,try不需要回滚.
  29. *
  30. * @return {@linkplain TccPatternEnum}
  31. */
  32. TccPatternEnum pattern() default TccPatternEnum.TCC;
  33. }

该注解通过类AbstractTccTransactionAspect的子类SpringCloudHmilyTransactionAspect拦截,切点是该注解,环绕执行的是TccTransactionInterceptor实现类的interceptor方法,当方法上有tcc注解时,都会执行该切面拦截器方法,

  1. @Aspect
  2. public abstract class AbstractTccTransactionAspect {
  3. private TccTransactionInterceptor tccTransactionInterceptor;
  4. protected void setTccTransactionInterceptor(final TccTransactionInterceptor tccTransactionInterceptor) {
  5. this.tccTransactionInterceptor = tccTransactionInterceptor;
  6. }
  7. /**
  8. * this is point cut with {@linkplain com.hmily.tcc.annotation.Tcc }.
  9. */
  10. @Pointcut("@annotation(com.hmily.tcc.annotation.Tcc)")
  11. public void hmilyTccInterceptor() {
  12. }
  13. /**
  14. * this is around in {@linkplain com.hmily.tcc.annotation.Tcc }.
  15. * @param proceedingJoinPoint proceedingJoinPoint
  16. * @return Object
  17. * @throws Throwable Throwable
  18. */
  19. @Around("hmilyTccInterceptor()")
  20. public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
  21. return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
  22. }
  23. /**
  24. * spring Order.
  25. *
  26. * @return int
  27. */
  28. public abstract int getOrder();
  29. }

子类会重写setTccTransactionInterceptor方法,程序启动就注入了SpringCloudHmilyTransactionInterceptor

接而进入到SpringCloudHmilyTransactionInterceptor的interceptor方法,该方法内定义TccTransactionContext对象,该对象定义了事务上下文,用来存储本地事务,包含事务Id,事务阶段,事务参与者的角色

 

  • try阶段

try 阶段,也就是本地事务阶段,还未发起远程调用,AOP拦截到@TCC注解,此时TccTransactionContext事务上下文为空

  1. @Override
  2. public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
  3. TccTransactionContext tccTransactionContext;
  4. //如果不是本地反射调用补偿
  5. RequestAttributes requestAttributes = null;
  6. try {
  7. requestAttributes = RequestContextHolder.currentRequestAttributes();
  8. } catch (Throwable ex) {
  9. LogUtil.warn(LOGGER, () -> "can not acquire request info:" + ex.getLocalizedMessage());
  10. }
  11. HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
  12. String context = request == null ? null : request.getHeader(CommonConstant.TCC_TRANSACTION_CONTEXT);
  13. if (StringUtils.isNoneBlank(context)) {
  14. tccTransactionContext = GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
  15. } else {
  16. tccTransactionContext = TransactionContextLocal.getInstance().get();
  17. if (Objects.nonNull(tccTransactionContext)) {
  18. tccTransactionContext.setRole(TccRoleEnum.SPRING_CLOUD.getCode());
  19. }
  20. }
  21. return hmilyTransactionAspectService.invoke(tccTransactionContext, pjp);
  22. }

事务拦截器首先会获取当前请求,从请求头里获取事务上下文后和切面参数一起进入事务service切面的invoke方法,关于事务上下文header请看HmilyRestTemplateInterceptor类

  1. @Override
  2. public Object invoke(final TccTransactionContext tccTransactionContext, final ProceedingJoinPoint point) throws Throwable {
  3. final Class clazz = hmilyTransactionFactoryService.factoryOf(tccTransactionContext);
  4. final HmilyTransactionHandler txTransactionHandler = (HmilyTransactionHandler) SpringBeanUtils.getInstance().getBean(clazz);
  5. return txTransactionHandler.handler(point, tccTransactionContext);
  6. }

该方法会根据事务上下文获取当前事务处于发起者还是参与者阶段,不同阶段返回不同的HmilyTransactionHandler子类,它由3实现类分别是发起者事务处理,参与者事务处理和本地事务处理,一般最常用的是发起者和参与者

由上述debug可知,在发起阶段由SpringCloudHmilyTransactionInterceptor的invoke方法传递过来的事务上下文是null,所以是发起者,begin是初始化事务,当point proceed执行完后,更新事务状态为try阶段,并更新事务日志.

该方法维护了一个TransactionContextLocal,用来存储TccTransactionContext事务上下文,内部是threadlocal,可以将事务上下文绑定到当前线程里.

如果出了异常,则在StarterHmilyTransactionHandler#handler方法catch块执行cancel方法,如果没有异常,就执行confirm方法确认tcc事务完成

  1. @Override
  2. public Object handler(final ProceedingJoinPoint point, TccTransactionContext context)
  3. throws Throwable {
  4. Object returnValue = null;
  5. try {
  6. TccTransaction tccTransaction;
  7. context = TransactionContextLocal.getInstance().get();
  8. if (context == null) {
  9. tccTransaction = hmilyTransactionExecutor.begin(point);
  10. try {
  11. //execute try
  12. returnValue = point.proceed();
  13. tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
  14. hmilyTransactionExecutor.updateStatus(tccTransaction);
  15. } catch (Throwable throwable) {
  16. //if exception ,execute cancel
  17. hmilyTransactionExecutor
  18. .cancel(hmilyTransactionExecutor.getCurrentTransaction());
  19. throw throwable;
  20. }
  21. //execute confirm
  22. hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
  23. } else if (context.getAction() == TccActionEnum.CONFIRMING.getCode()) {
  24. //execute confirm
  25. hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
  26. }
  27. } finally {
  28. hmilyTransactionExecutor.remove();
  29. }
  30. return returnValue;
  31. }

当发起者执行ponit.proceed完毕意味着包括了多个参与者的tcc方法也一起执行完毕了.

接下来是消费者的方法,一样拦截tcc注解和上面代码一样走

在这里返回的是消费者的处理器

接着放行执行代码feign,进入feign的拦截器,由于SpringCloudHmilyTransactionAspect切面设置了最高级的排序所以会比Feign拦截器切面优先执行,下面来看HmilyRestTemplateConfiguration自定义的feign拦截器

第60行代码就是我们上面说的从请求通过"事务上下文header"获取事务的来源代码,会进入HmilyRestTemplateInterceptor#apply方法.

当在try阶段参与者出现超时等异常会进入70行代码进行回滚操作

接下里是参与者的方法,首先会判断当前事务处于哪个阶段,如果在try阶段,那么开始把参与者加入到当前事务上下文里,并更新事务状态,如果当前参与者事务try阶段异常,就删除事务,如果是confirm阶段,就执行confirm,如果是cancel阶段就执行cancel

  1. @Override
  2. public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context) throws Throwable {
  3. TccTransaction tccTransaction = null;
  4. TccTransaction currentTransaction;
  5. switch (TccActionEnum.acquireByCode(context.getAction())) {
  6. case TRYING:
  7. try {
  8. tccTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
  9. final Object proceed = point.proceed();
  10. tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
  11. //update log status to try
  12. hmilyTransactionExecutor.updateStatus(tccTransaction);
  13. return proceed;
  14. } catch (Throwable throwable) {
  15. //if exception ,delete log.
  16. hmilyTransactionExecutor.deleteTransaction(tccTransaction);
  17. assert tccTransaction != null;
  18. TccTransactionCacheManager.getInstance().removeByKey(tccTransaction.getTransId());
  19. throw throwable;
  20. }
  21. case CONFIRMING:
  22. currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
  23. hmilyTransactionExecutor.confirm(currentTransaction);
  24. break;
  25. case CANCELING:
  26. currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
  27. hmilyTransactionExecutor.cancel(currentTransaction);
  28. break;
  29. default:
  30. break;
  31. }
  32. Method method = ((MethodSignature) (point.getSignature())).getMethod();
  33. return DefaultValueUtils.getDefaultValue(method.getReturnType());
  34. }

总结:

  1. 框架的执行核心就是AOP,所有TCC操作都是经过AOP实现的,发起者的AOP环绕前结束时,所有的发起者对应的tcc方法都已经执行完毕
  2. 每个TCC注解的方法,都要实现一个对应的confirm方法和cancel方法
  3. 不管是事务的发起者还是参与者,都是一个阶段都执行完毕了才会进行到下一个阶段,也就是一起pretry,一起try,一起confirm或一起cancel
  4. 对于我们项目里,一般来说service方法执行完毕了,整个业务流程就结束了,但是这个框架不一样,service方法执行完毕只能代码try阶段执行完毕,真正的业务流程结束在confirm里,所以一定要定义好这两个方法
  5. tcc框架有个缺点,就是每个涉及到事务的方法都要实现它的confirm和cancel方法

 

 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/286163
推荐阅读
相关标签
  

闽ICP备14008679号