  • TM(事务管理器):用来控制整个分布式事务的管理,发起全局事务的Begin/Commit/Rollback
  • RM(资源管理器):用来注册自己的分支事务,接受TCCommit或者Rollback请求.


首先我们来介绍一些Seata-clientSpring模块,Seata通过这个模块对自己的TMRM进行初始化以及扫描AT模式和TCC模式的注解并初始化这些模式需要的资源。 在Seata的项目中有一个spring模块,里面包含了我们和spring相关的逻辑,GlobalTransactionScanner是其中的核心类:

  1. public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean,ApplicationContextAware,
  2. DisposableBean



  1. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  2. if (PROXYED_SET.contains(beanName)) {
  3. return bean;
  4. }
  5. interceptor = null;
  6. //check TCC proxy
  7. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  8. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  9. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  10. } else {
  11. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  12. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  13. if (!existsAnnotation(new Class[]{serviceInterface})
  14. && !existsAnnotation(interfacesIfJdk)) {
  15. return bean;
  16. }
  17. if (interceptor == null) {
  18. interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  19. }
  20. }
  21. if (!AopUtils.isAopProxy(bean)) {
  22. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  23. } else {
  24. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  25. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  26. for (Advisor avr : advisor) {
  27. advised.addAdvisor(0, avr);
  28. }
  29. }
  30. PROXYED_SET.add(beanName);
  31. return bean;
  32. }
  • Step1:检查当前beanName是否已经处理过 如果处理过本次就不处理。
  • Step2:根据注解,找到对应模式的Inteceptor,这里有三种情况第一个TCC,第二个是全局事务管理TM的拦截器,第三个是没有注解,如果没有那么直接返回即可。
  • Step3:将对应的interceptor添加进入当前Bean


  1. public void afterPropertiesSet() {
  2. initClient();
  3. }
  4. private void initClient() {
  5. //init TM
  6. TMClient.init(applicationId, txServiceGroup);
  7. //init RM
  8. RMClient.init(applicationId, txServiceGroup);
  9. registerSpringShutdownHook();
  10. }
  11. private void registerSpringShutdownHook() {
  12. if (applicationContext instanceof ConfigurableApplicationContext) {
  13. ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
  14. ShutdownHook.removeRuntimeShutdownHook();
  15. }
  16. ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
  17. ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
  18. }


  • Step1:初始化TM客户端,这里会向Server注册该TM
  • Step2:初始化RM客户端,这里会向Server注册该RM
  • Step3:注册ShutdownHook,后续将TMRM优雅关闭。


2.1 Interceptor


2.1.1 GlobalTransactionalInterceptor


  1. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  2. Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
  3. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  4. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  5. final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
  6. final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
  7. if (globalTransactionalAnnotation != null) {
  8. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
  9. } else if (globalLockAnnotation != null) {
  10. return handleGlobalLock(methodInvocation);
  11. } else {
  12. return methodInvocation.proceed();
  13. }
  14. }
  • Step1:从代理类中获取到原始的Method
  • Step2: 获取Method中的注解
  • Step3: 如果有@GlobalTransactional注解执行handleGlobalTransaction切面逻辑,这个也是我们全局事务的逻辑。
  • Step4: 如果有@GlobalLock注解,则执行handleGlobalLock切面逻辑,这个注解是用于一些非AT模式的数据库加锁,加上这个注解之后再执行Sql语句之前会查询对应的数据是否加锁,但是他不会加入全局事务。


  1. private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
  2. final GlobalTransactional globalTrxAnno) throws Throwable {
  3. return transactionalTemplate.execute(new TransactionalExecutor() {
  4. @Override
  5. public Object execute() throws Throwable {
  6. return methodInvocation.proceed();
  7. }
  8. });
  9. }
  10. TransactionalTemplate#execute
  11. public Object execute(TransactionalExecutor business) throws Throwable {
  12. // 1. get or create a transaction
  13. GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
  14. // 1.1 get transactionInfo
  15. TransactionInfo txInfo = business.getTransactionInfo();
  16. if (txInfo == null) {
  17. throw new ShouldNeverHappenException("transactionInfo does not exist");
  18. }
  19. try {
  20. // 2. begin transaction
  21. beginTransaction(txInfo, tx);
  22. Object rs = null;
  23. try {
  24. // Do Your Business
  25. rs = business.execute();
  26. } catch (Throwable ex) {
  27. // 3.the needed business exception to rollback.
  28. completeTransactionAfterThrowing(txInfo,tx,ex);
  29. throw ex;
  30. }
  31. // 4. everything is fine, commit.
  32. commitTransaction(tx);
  33. return rs;
  34. } finally {
  35. //5. clear
  36. triggerAfterCompletion();
  37. cleanUp();
  38. }
  39. }


  • Step1:获取当前的全局事务,如果没有则创建。
  • Step2:获取业务中的事务信息包含超时时间等。
  • Step3:开启全局事务
  • Step4:如果有异常抛出处理异常,rollback。
  • Step5:如果没有异常那么commit全局事务。
  • Step6:清除当前事务上下文信息。

2.1.2 TccActionInterceptor


  1. @TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
  2. public boolean prepare(BusinessActionContext actionContext, int a);
  3. public boolean commit(BusinessActionContext actionContext);
  4. public boolean rollback(BusinessActionContext actionContext);



  1. public Object invoke(final MethodInvocation invocation) throws Throwable {
  2. Method method = getActionInterfaceMethod(invocation);
  3. TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
  4. //try method
  5. if(businessAction != null) {
  6. if(StringUtils.isBlank(RootContext.getXID())){
  7. //not in distribute transaction
  8. return invocation.proceed();
  9. }
  10. Object[] methodArgs = invocation.getArguments();
  11. //Handler the TCC Aspect
  12. Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, businessAction, new Callback<Object>(){
  13. @Override
  14. public Object execute() throws Throwable {
  15. return invocation.proceed();
  16. }
  17. });
  18. //return the final result
  19. return ret.get(Constants.TCC_METHOD_RESULT);
  20. }
  21. return invocation.proceed();
  22. }
  • Step1:获取原始Method
  • Step2:判断是否再全局事务中,也就是整个逻辑服务最外层是否执行了GlobalTransactionalInterceptor。如果不再直接执行即可。
  • Step3:执行TCC切面,核心逻辑在actionInterceptorHandler#proceed中。


  1. public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
  2. Map<String, Object> ret = new HashMap<String, Object>(16);
  3. //TCC name
  4. String actionName = businessAction.name();
  5. String xid = RootContext.getXID();
  6. BusinessActionContext actionContext = new BusinessActionContext();
  7. actionContext.setXid(xid);
  8. //set action anme
  9. actionContext.setActionName(actionName)
  10. //Creating Branch Record
  11. String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
  12. actionContext.setBranchId(branchId);
  13. //set the parameter whose type is BusinessActionContext
  14. Class<?>[] types = method.getParameterTypes();
  15. int argIndex = 0;
  16. for (Class<?> cls : types) {
  17. if (cls.getName().equals(BusinessActionContext.class.getName())) {
  18. arguments[argIndex] = actionContext;
  19. break;
  20. }
  21. argIndex++;
  22. }
  23. //the final parameters of the try method
  24. ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
  25. //the final result
  26. ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
  27. return ret;
  28. }
  • Step1:获取一些事务信息,比如TCC名字,本次事务XID等。
  • Step2:创建Branch事务,一个是在本地的context上下文中将它的commitrollback信息保存起来,另一个是向我们的Seata-Server注册分支事务,用于后续的管理。
  • Step3:填充方法参数,也就是我们的BusinessActionContext

2.2 小结


3. TM 事务管理器


  1. public class DefaultTransactionManager implements TransactionManager {
  2. @Override
  3. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
  4. throws TransactionException {
  GlobalBeginRequest request = new GlobalBeginRequest();
  6. request.setTransactionName(name);
  7. request.setTimeout(timeout);
  8. GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
  9. return response.getXid();
  10. }
  11. @Override
  12. public GlobalStatus commit(String xid) throws TransactionException {
  13. GlobalCommitRequest globalCommit = new GlobalCommitRequest();
  14. globalCommit.setXid(xid);
  15. GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
  16. return response.getGlobalStatus();
  17. }
  18. @Override
  19. public GlobalStatus rollback(String xid) throws TransactionException {
  20. GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
  21. globalRollback.setXid(xid);
  22. GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
  return response.getGlobalStatus();
  24. }
  25. @Override
  26. public GlobalStatus getStatus(String xid) throws TransactionException {
  27. GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
  28. queryGlobalStatus.setXid(xid);
  29. GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
  30. return response.getGlobalStatus();
  31. }
  32. private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
  33. try {
  return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
  35. } catch (TimeoutException toe) {
  36. throw new TransactionException(TransactionExceptionCode.IO, toe);
  37. }
  38. }
  39. }


  • begin:向Server发起GlobalBeginRequest请求,用于开启全局事务。
  • commit:向Server发起GlobalCommitRequest请求,用于提交全局事务。
  • rollback:向Server发起GlobalRollbackRequest请求,用于回滚全局事务。
  • getStatus:向Server发起GlobalStatusRequest请求,用于查询全局事务状态信息。

4. RM 资源管理器


4.1 AT 资源管理



  • Step 1:从数据源中获取数据库连接。
  • Step 2: 从连接中获取Statement
  • Step 3: 通过Statement执行我们的sql语句


4.1.1 DataSourceProxy


  1. public ConnectionProxy getConnection() throws SQLException {
  2. Connection targetConnection = targetDataSource.getConnection();
  3. return new ConnectionProxy(this, targetConnection);
  4. }


4.1.2 ConnectionProxy



  1. @Override
  2. public Statement createStatement() throws SQLException {
  3. Statement targetStatement = getTargetConnection().createStatement();
  4. return new StatementProxy(this, targetStatement);
  5. }
  6. @Override
  public PreparedStatement prepareStatement(String sql) throws SQLException {
  8. PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  9. return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  10. }



  1. /**
  2. * append sqlUndoLog
  3. *
  4. * @param sqlUndoLog the sql undo log
  5. */
  public void appendUndoLog(SQLUndoLog sqlUndoLog) {
  7. context.appendUndoItem(sqlUndoLog);
  8. }
  9. /**
  10. * append lockKey
  11. *
  12. * @param lockKey the lock key
  13. */
  14. public void appendLockKey(String lockKey) {
  15. context.appendLockKey(lockKey);
  16. }



  1. public void commit() throws SQLException {
  public void commit() throws SQLException {
  2. if (context.inGlobalTransaction()) {
  3. processGlobalTransactionCommit();
  4. } else if (context.isGlobalLockRequire()) {
  5. processLocalCommitWithGlobalLocks();
  6. } else {
  7. targetConnection.commit();
  8. }
  9. }
  10. private void processGlobalTransactionCommit() throws SQLException {
  11. try {
  12. register();
  13. } catch (TransactionException e) {
  recognizeLockKeyConflictException(e);
  15. }
  16. try {
  17. if (context.hasUndoLog()) {
  18. UndoLogManager.flushUndoLogs(this);
  19. }
  20. targetConnection.commit();
  21. } catch (Throwable ex) {
  22. report(false);
  23. if (ex instanceof SQLException) {
  24. throw new SQLException(ex);
  25. }
  26. }
  27. report(true);
  28. context.reset();
  29. }
  • Step 1:判断context是否再全局事务中,如果在则进行提交,到Step2。
  • Step 2: 注册分支事务并加上全局锁,如果全局锁加锁失败则抛出异常。
  • Step 3: 如果context中有undolog,那么将Unlog刷至数据库。
  • Step 4: 提交本地事务。
  • Step 5:报告本地事务状态,如果出现异常则报告失败,如果没有问题则报告正常。



  1. public void rollback() throws SQLException {
  2. targetConnection.rollback();
  targetConnection.rollback();
  3. if (context.inGlobalTransaction()) {
  4. if (context.isBranchRegistered()) {
  5. report(false);
  6. }
  7. }
  8. context.reset();
  9. }
  • Step 1:首先提交本地事务。
  • Step 2:判断是否在全局事务中。
  • Step 3:如果在则判断分支事务是否已经注册。
  • Step 4: 如果已经注册那么直接向客户端报告该事务失败异常。


4.1.3 StatementProxy


  1. public boolean execute(String sql) throws SQLException {
  2. this.targetSQL = sql;
  3. return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
  4. @Override
  5. public Boolean execute(T statement, Object... args) throws SQLException {
  6. return statement.execute((String) args[0]);
  7. }
  8. }, sql);
  9. }


  1. public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
  2. StatementProxy<S> statementProxy,
  3. StatementCallback<T, S> statementCallback,
  4. Object... args) throws SQLException {
  5. if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
  6. // Just work as original statement
  7. return statementCallback.execute(statementProxy.getTargetStatement(), args);
  8. }
  9. if (sqlRecognizer == null) {
  10. sqlRecognizer = SQLVisitorFactory.get(
  11. statementProxy.getTargetSQL(),
  12. statementProxy.getConnectionProxy().getDbType());
  13. }
  14. Executor<T> executor = null;
  15. if (sqlRecognizer == null) {
  16. executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
  17. } else {
  18. switch (sqlRecognizer.getSQLType()) {
  19. case INSERT:
  20. executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
  21. break;
  22. case UPDATE:
  23. executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
  24. break;
  25. case DELETE:
  26. executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
  27. break;
  29. executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
  30. break;
  31. default:
  32. executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
  33. break;
  34. }
  35. }
  36. T rs = null;
  37. try {
  38. rs = executor.execute(args);
  39. } catch (Throwable ex) {
  40. if (!(ex instanceof SQLException)) {
  41. // Turn other exception into SQLException
  42. ex = new SQLException(ex);
  43. }
  44. throw (SQLException)ex;
  45. }
  46. return rs;
  47. }
  48. }


  • Step 1:如果不在全局事务且不需要查询全局锁,那么就直接执行原始的Statement
  • Step 2: 如果没有传入sql识别器,那么我们需要生成sql识别器,这里我们会借用Druid中对sql的解析,我们获取sql的识别器,我们通过这个识别器可以获取到不同类型的sql语句的一些条件,比如说SQLUpdateRecognizer是用于updatesql识别器,我们可以直接获取到表名,条件语句,更新的字段,更新字段的值等。
  • Step 3:根据sql识别器的类型,来生成我们不同类型的执行器。
  • Step 4:通过第三步中的执行器来执行我们的sql语句。



  1. protected T executeAutoCommitFalse(Object[] args) throws Throwable {
  2. TableRecords beforeImage = beforeImage();
  3. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  4. TableRecords afterImage = afterImage(beforeImage);
  5. prepareUndoLog(beforeImage, afterImage);
  6. return result;
  7. }
  8. protected abstract TableRecords beforeImage() throws SQLException;
  9. protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;


  • Step 1:获取执行当前sql之前所受影响行的快照,这里beforeImage会被不同类型的sql语句重新实现。
  • Step 2:执行当前sql语句,并获取结果。
  • Step 3:获取执行sql之后的快照,这里的afterIamge也会被不同类型的sql语句重新实现。
  • Step 4:将undolog准备好,这里会保存到我们的ConnectionContext中。
  1. protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
  2. if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
  3. return;
  4. }
  5. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  6. TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
  7. String lockKeys = buildLockKey(lockKeyRecords);
  8. connectionProxy.appendLockKey(lockKeys);
  9. SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
  10. connectionProxy.appendUndoLog(sqlUndoLog);
  11. }


4.1.4 分支事务的提交和回滚


  1. public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  2. return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
  3. }
  4. public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  5. if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
  6. LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
  7. }
  8. return BranchStatus.PhaseTwo_Committed;
  9. }




  1. public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  2. DataSourceProxy dataSourceProxy = get(resourceId);
  3. if (dataSourceProxy == null) {
  4. throw new ShouldNeverHappenException();
  5. }
  6. try {
  7. UndoLogManager.undo(dataSourceProxy, xid, branchId);
  8. } catch (TransactionException te) {
  9. if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
  10. return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
  11. } else {
  12. return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
  13. }
  14. }
  15. return BranchStatus.PhaseTwo_Rollbacked;
  16. }


4.2 TCC 资源管理


  1. public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
  2. String applicationData) throws TransactionException {
  3. TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
  4. if (tccResource == null) {
  5. throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
  6. }
  7. Object targetTCCBean = tccResource.getTargetBean();
  8. Method commitMethod = tccResource.getCommitMethod();
  9. if (targetTCCBean == null || commitMethod == null) {
  10. throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
  11. }
  12. boolean result = false;
  13. //BusinessActionContext
  14. BusinessActionContext businessActionContext =
  15. getBusinessActionContext(xid, branchId, resourceId, applicationData);
  16. Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
  17. LOGGER.info("TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:" +
  18. resourceId);
  19. if (ret != null && ret instanceof TwoPhaseResult) {
  20. result = ((TwoPhaseResult) ret).isSuccess();
  21. } else {
  22. result = (boolean) ret;
  23. }
  24. return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
  25. }


  • Step 1:首先查找当前服务是否有该TCC资源,如果没有抛出异常。
  • Step 2:然后找到我们的TCC对象和对应的commit方法。
  • Step 3:然后执行我们的commit方法。
  • Step 4:最后将结果返回给我们的Server,由Server决定是否重试。





