当前位置:   article > 正文

Seata AT TM->RC->RM一次完整的交互过程_seata tm和rm

seata tm和rm

原理

TM两阶段:

阶段1:TM向TC申请全局事务,netty客户端发起了一次记录xid的请求

阶段2:TC协调之后,决定执行RM是否提交或者回滚。

spring公共组件部分

1、SeataAutoConfiguration类

利用springboot自动装配机制从spring.factories文件加载自动配置类SeataAutoConfiguration

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
  2. @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
  3. @AutoConfigureAfter({SeataCoreAutoConfiguration.class})
  4. public class SeataAutoConfiguration {
  5. private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
  6. @Bean(BEAN_NAME_FAILURE_HANDLER)
  7. @ConditionalOnMissingBean(FailureHandler.class)
  8. public FailureHandler failureHandler() {
  9. return new DefaultFailureHandlerImpl();
  10. }
  11. @Bean
  12. @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
  13. @ConditionalOnMissingBean(GlobalTransactionScanner.class)
  14. public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
  15. ConfigurableListableBeanFactory beanFactory,
  16. @Autowired(required = false) List<ScannerChecker> scannerCheckers) {
  17. if (LOGGER.isInfoEnabled()) {
  18. LOGGER.info("Automatically configure Seata");
  19. }
  20. // set bean factory
  21. GlobalTransactionScanner.setBeanFactory(beanFactory);
  22. // add checkers
  23. // '/META-INF/services/io.seata.spring.annotation.ScannerChecker'
  24. GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
  25. // spring beans
  26. GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
  27. // add scannable packages
  28. GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
  29. // add excludeBeanNames
  30. GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
  31. // create global transaction scanner
  32. return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
  33. }
  34. }

2、 GlobalTransactionScanner类

  1. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  2. implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean

初始化TM、RM的netty客户端,提供aop能力

2-1、初始化TM、RM的netty客户端

io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet

spring-bean初始化赋值回调,这里初始化TM、RM的netty客户端,为后续发起请求做准备。

  1. @Override
  2. public void afterPropertiesSet() {
  3. if (disableGlobalTransaction) {
  4. if (LOGGER.isInfoEnabled()) {
  5. LOGGER.info("Global transaction is disabled.");
  6. }
  7. ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  8. (ConfigurationChangeListener)this);
  9. return;
  10. }
  11. if (initialized.compareAndSet(false, true)) {
  12. initClient();
  13. }
  14. }
  15. private void initClient() {
  16. if (LOGGER.isInfoEnabled()) {
  17. LOGGER.info("Initializing Global Transaction Clients ... ");
  18. }
  19. if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
  20. throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
  21. }
  22. //init TM
  23. TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
  24. if (LOGGER.isInfoEnabled()) {
  25. LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  26. }
  27. //init RM
  28. RMClient.init(applicationId, txServiceGroup);
  29. if (LOGGER.isInfoEnabled()) {
  30. LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  31. }
  32. if (LOGGER.isInfoEnabled()) {
  33. LOGGER.info("Global Transaction Clients are initialized. ");
  34. }
  35. registerSpringShutdownHook();
  36. }

2-2、aop代理实现

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary,递归调用适配代理,Seata重写了这个方法,TCC代理、AT代理使用的还是spring那一套代理区分方式(jdk还是cglib)

  1. /**
  2. * The following will be scanned, and added corresponding interceptor:
  3. *
  4. * TM:
  5. * @see io.seata.spring.annotation.GlobalTransactional // TM annotation
  6. * Corresponding interceptor:
  7. * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
  8. *
  9. * GlobalLock:
  10. * @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
  11. * Corresponding interceptor:
  12. * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler
  13. *
  14. * TCC mode:
  15. * @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
  16. * @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
  17. * @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
  18. * Corresponding interceptor:
  19. * @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
  20. */
  21. @Override
  22. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  23. // do checkers
  24. if (!doCheckers(bean, beanName)) {
  25. return bean;
  26. }
  27. try {
  28. synchronized (PROXYED_SET) {
  29. if (PROXYED_SET.contains(beanName)) {
  30. return bean;
  31. }
  32. interceptor = null;
  33. //check TCC proxy
  34. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  35. // init tcc fence clean task if enable useTccFence
  36. TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
  37. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  38. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  39. ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  40. (ConfigurationChangeListener)interceptor);
  41. } else {
  42. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  43. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  44. if (!existsAnnotation(new Class[]{serviceInterface})
  45. && !existsAnnotation(interfacesIfJdk)) {
  46. return bean;
  47. }
  48. if (globalTransactionalInterceptor == null) {
  49. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  50. ConfigurationCache.addConfigListener(
  51. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  52. (ConfigurationChangeListener)globalTransactionalInterceptor);
  53. }
  54. interceptor = globalTransactionalInterceptor;
  55. }
  56. LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  57. if (!AopUtils.isAopProxy(bean)) {
  58. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  59. } else {
  60. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  61. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  62. int pos;
  63. for (Advisor avr : advisor) {
  64. // Find the position based on the advisor's order, and add to advisors by pos
  65. pos = findAddSeataAdvisorPosition(advised, avr);
  66. advised.addAdvisor(pos, avr);
  67. }
  68. }
  69. PROXYED_SET.add(beanName);
  70. return bean;
  71. }
  72. } catch (Exception exx) {
  73. throw new RuntimeException(exx);
  74. }
  75. }

3、GlobalTransactionalInterceptor类

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/286187
推荐阅读
相关标签
  

闽ICP备14008679号