赞
踩
这里就不罗列TCC事务的优点了,奉上该项目的地址https://github.com/yu199195/hmily
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点上,
本质上来说,分布式事务是为了保证不同数据库的数据一致性
TCC事务主要是基于AOP切面拦截实现的三阶段提交事务,下面我们来跟读源码
解决多个微服务之间数据可能不一致的问题,一般有个CAP理论:
CAP原理证明,任何分布式系统只可同时满足以上两点,无法三者兼顾.由于关系型数据库是单节点无复制的,因此不具有分区容忍性,但具有一致性和可用性,而分布式的微服务系统都必须满足分区容忍性,SpringCloud中的Eureka就是A P 定理的结合
项目结构:
1.hmily-tcc-admin:用来监控事务日志
2.hmily-tcc-annotation:提供tcc aop注解
3.hmily-tcc-common:提供一些工具类和配置类
4.hmily-tcc-core:分布式事务执行的核心部分
这个框架主要模拟一个场景,下单---->新建订单---->扣款----->减库存的操作,涉及到3个库
tcc分布式事务主要是基于AOP切面原理进行注解拦截三阶段提交(try-confirm-cancel)的分布式事务,下面我们进行源码跟读:
HmilyTransactionBootstrap启动引导类
- /**
- * tcc分布式事务框架注解.
- * @author xiaoyu
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ElementType.METHOD})
- public @interface Tcc {
-
- /**
- * spring事务传播.
- * @return {@linkplain PropagationEnum}
- */
- PropagationEnum propagation() default PropagationEnum.PROPAGATION_REQUIRED;
-
- /**
- * tcc框架确认方法 tcc中第一个c.
- *
- * @return confirm方法名称
- */
- String confirmMethod() default "";
-
- /**
- * tcc框架确认方法 tcc中第二个c.
- *
- * @return cancel方法名称
- */
- String cancelMethod() default "";
-
- /**
- * 模式 tcc 和cc模式.
- * tcc模式代表try中有数据库操作,try需要回滚.
- * cc模式代表try中无数据库操作,try不需要回滚.
- *
- * @return {@linkplain TccPatternEnum}
- */
- TccPatternEnum pattern() default TccPatternEnum.TCC;
-
- }
该注解通过类AbstractTccTransactionAspect的子类SpringCloudHmilyTransactionAspect拦截,切点是该注解,环绕执行的是TccTransactionInterceptor实现类的interceptor方法,当方法上有tcc注解时,都会执行该切面拦截器方法,
- @Aspect
- public abstract class AbstractTccTransactionAspect {
-
- private TccTransactionInterceptor tccTransactionInterceptor;
-
- protected void setTccTransactionInterceptor(final TccTransactionInterceptor tccTransactionInterceptor) {
- this.tccTransactionInterceptor = tccTransactionInterceptor;
- }
-
- /**
- * this is point cut with {@linkplain com.hmily.tcc.annotation.Tcc }.
- */
- @Pointcut("@annotation(com.hmily.tcc.annotation.Tcc)")
- public void hmilyTccInterceptor() {
- }
-
- /**
- * this is around in {@linkplain com.hmily.tcc.annotation.Tcc }.
- * @param proceedingJoinPoint proceedingJoinPoint
- * @return Object
- * @throws Throwable Throwable
- */
- @Around("hmilyTccInterceptor()")
- public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
- return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
- }
-
- /**
- * spring Order.
- *
- * @return int
- */
- public abstract int getOrder();
- }
子类会重写setTccTransactionInterceptor方法,程序启动就注入了SpringCloudHmilyTransactionInterceptor
接而进入到SpringCloudHmilyTransactionInterceptor的interceptor方法,该方法内定义TccTransactionContext对象,该对象定义了事务上下文,用来存储本地事务,包含事务Id,事务阶段,事务参与者的角色
try 阶段,也就是本地事务阶段,还未发起远程调用,AOP拦截到@TCC注解,此时TccTransactionContext事务上下文为空
- @Override
- public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
- TccTransactionContext tccTransactionContext;
- //如果不是本地反射调用补偿
- RequestAttributes requestAttributes = null;
- try {
- requestAttributes = RequestContextHolder.currentRequestAttributes();
- } catch (Throwable ex) {
- LogUtil.warn(LOGGER, () -> "can not acquire request info:" + ex.getLocalizedMessage());
- }
-
- HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
- String context = request == null ? null : request.getHeader(CommonConstant.TCC_TRANSACTION_CONTEXT);
- if (StringUtils.isNoneBlank(context)) {
- tccTransactionContext = GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
- } else {
- tccTransactionContext = TransactionContextLocal.getInstance().get();
- if (Objects.nonNull(tccTransactionContext)) {
- tccTransactionContext.setRole(TccRoleEnum.SPRING_CLOUD.getCode());
- }
- }
- return hmilyTransactionAspectService.invoke(tccTransactionContext, pjp);
- }
事务拦截器首先会获取当前请求,从请求头里获取事务上下文后和切面参数一起进入事务service切面的invoke方法,关于事务上下文header请看HmilyRestTemplateInterceptor类
- @Override
- public Object invoke(final TccTransactionContext tccTransactionContext, final ProceedingJoinPoint point) throws Throwable {
- final Class clazz = hmilyTransactionFactoryService.factoryOf(tccTransactionContext);
- final HmilyTransactionHandler txTransactionHandler = (HmilyTransactionHandler) SpringBeanUtils.getInstance().getBean(clazz);
- return txTransactionHandler.handler(point, tccTransactionContext);
- }
该方法会根据事务上下文获取当前事务处于发起者还是参与者阶段,不同阶段返回不同的HmilyTransactionHandler子类,它由3实现类分别是发起者事务处理,参与者事务处理和本地事务处理,一般最常用的是发起者和参与者
由上述debug可知,在发起阶段由SpringCloudHmilyTransactionInterceptor的invoke方法传递过来的事务上下文是null,所以是发起者,begin是初始化事务,当point proceed执行完后,更新事务状态为try阶段,并更新事务日志.
该方法维护了一个TransactionContextLocal,用来存储TccTransactionContext事务上下文,内部是threadlocal,可以将事务上下文绑定到当前线程里.
如果出了异常,则在StarterHmilyTransactionHandler#handler方法catch块执行cancel方法,如果没有异常,就执行confirm方法确认tcc事务完成
- @Override
- public Object handler(final ProceedingJoinPoint point, TccTransactionContext context)
- throws Throwable {
- Object returnValue = null;
- try {
- TccTransaction tccTransaction;
- context = TransactionContextLocal.getInstance().get();
- if (context == null) {
- tccTransaction = hmilyTransactionExecutor.begin(point);
- try {
- //execute try
- returnValue = point.proceed();
- tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
- hmilyTransactionExecutor.updateStatus(tccTransaction);
- } catch (Throwable throwable) {
- //if exception ,execute cancel
- hmilyTransactionExecutor
- .cancel(hmilyTransactionExecutor.getCurrentTransaction());
- throw throwable;
- }
- //execute confirm
- hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
- } else if (context.getAction() == TccActionEnum.CONFIRMING.getCode()) {
- //execute confirm
- hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
- }
-
- } finally {
- hmilyTransactionExecutor.remove();
- }
- return returnValue;
- }
当发起者执行ponit.proceed完毕意味着包括了多个参与者的tcc方法也一起执行完毕了.
接下来是消费者的方法,一样拦截tcc注解和上面代码一样走
在这里返回的是消费者的处理器
接着放行执行代码feign,进入feign的拦截器,由于SpringCloudHmilyTransactionAspect切面设置了最高级的排序所以会比Feign拦截器切面优先执行,下面来看HmilyRestTemplateConfiguration自定义的feign拦截器
第60行代码就是我们上面说的从请求通过"事务上下文header"获取事务的来源代码,会进入HmilyRestTemplateInterceptor#apply方法.
当在try阶段参与者出现超时等异常会进入70行代码进行回滚操作
接下里是参与者的方法,首先会判断当前事务处于哪个阶段,如果在try阶段,那么开始把参与者加入到当前事务上下文里,并更新事务状态,如果当前参与者事务try阶段异常,就删除事务,如果是confirm阶段,就执行confirm,如果是cancel阶段就执行cancel
- @Override
- public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context) throws Throwable {
- TccTransaction tccTransaction = null;
- TccTransaction currentTransaction;
- switch (TccActionEnum.acquireByCode(context.getAction())) {
- case TRYING:
- try {
- tccTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
- final Object proceed = point.proceed();
- tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
- //update log status to try
- hmilyTransactionExecutor.updateStatus(tccTransaction);
- return proceed;
- } catch (Throwable throwable) {
- //if exception ,delete log.
- hmilyTransactionExecutor.deleteTransaction(tccTransaction);
- assert tccTransaction != null;
- TccTransactionCacheManager.getInstance().removeByKey(tccTransaction.getTransId());
- throw throwable;
- }
- case CONFIRMING:
- currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
- hmilyTransactionExecutor.confirm(currentTransaction);
- break;
- case CANCELING:
- currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
- hmilyTransactionExecutor.cancel(currentTransaction);
- break;
- default:
- break;
- }
- Method method = ((MethodSignature) (point.getSignature())).getMethod();
- return DefaultValueUtils.getDefaultValue(method.getReturnType());
- }
总结:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。