赞
踩
原理
TM两阶段:
阶段1:TM向TC申请全局事务,netty客户端发起了一次记录xid的请求
阶段2:TC协调之后,决定执行RM是否提交或者回滚。
利用springboot自动装配机制从spring.factories文件加载自动配置类SeataAutoConfiguration
- org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
-
- @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
- @AutoConfigureAfter({SeataCoreAutoConfiguration.class})
- public class SeataAutoConfiguration {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
-
- @Bean(BEAN_NAME_FAILURE_HANDLER)
- @ConditionalOnMissingBean(FailureHandler.class)
- public FailureHandler failureHandler() {
- return new DefaultFailureHandlerImpl();
- }
-
- @Bean
- @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
- @ConditionalOnMissingBean(GlobalTransactionScanner.class)
- public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
- ConfigurableListableBeanFactory beanFactory,
- @Autowired(required = false) List<ScannerChecker> scannerCheckers) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Automatically configure Seata");
- }
-
- // set bean factory
- GlobalTransactionScanner.setBeanFactory(beanFactory);
-
- // add checkers
- // '/META-INF/services/io.seata.spring.annotation.ScannerChecker'
- GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
- // spring beans
- GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
-
- // add scannable packages
- GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
- // add excludeBeanNames
- GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
-
- // create global transaction scanner
- return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
- }
- }
- public class GlobalTransactionScanner extends AbstractAutoProxyCreator
- implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
初始化TM、RM的netty客户端,提供aop能力
io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
spring-bean初始化赋值回调,这里初始化TM、RM的netty客户端,为后续发起请求做准备。
- @Override
- public void afterPropertiesSet() {
- if (disableGlobalTransaction) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Global transaction is disabled.");
- }
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)this);
- return;
- }
- if (initialized.compareAndSet(false, true)) {
- initClient();
- }
- }
-
- private void initClient() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Initializing Global Transaction Clients ... ");
- }
- if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
- throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
- }
- //init TM
- TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
- }
- //init RM
- RMClient.init(applicationId, txServiceGroup);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
- }
-
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Global Transaction Clients are initialized. ");
- }
- registerSpringShutdownHook();
-
- }
io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary,递归调用适配代理,Seata重写了这个方法,TCC代理、AT代理使用的还是spring那一套代理区分方式(jdk还是cglib)
- /**
- * The following will be scanned, and added corresponding interceptor:
- *
- * TM:
- * @see io.seata.spring.annotation.GlobalTransactional // TM annotation
- * Corresponding interceptor:
- * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
- *
- * GlobalLock:
- * @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
- * Corresponding interceptor:
- * @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler
- *
- * TCC mode:
- * @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
- * @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
- * @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
- * Corresponding interceptor:
- * @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
- */
- @Override
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- // do checkers
- if (!doCheckers(bean, beanName)) {
- return bean;
- }
-
- try {
- synchronized (PROXYED_SET) {
- if (PROXYED_SET.contains(beanName)) {
- return bean;
- }
- interceptor = null;
- //check TCC proxy
- if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
- // init tcc fence clean task if enable useTccFence
- TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
- //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
- interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)interceptor);
- } else {
- Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
- Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
-
- if (!existsAnnotation(new Class[]{serviceInterface})
- && !existsAnnotation(interfacesIfJdk)) {
- return bean;
- }
-
- if (globalTransactionalInterceptor == null) {
- globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
- ConfigurationCache.addConfigListener(
- ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)globalTransactionalInterceptor);
- }
- interceptor = globalTransactionalInterceptor;
- }
-
- LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
- if (!AopUtils.isAopProxy(bean)) {
- bean = super.wrapIfNecessary(bean, beanName, cacheKey);
- } else {
- AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
- Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
- int pos;
- for (Advisor avr : advisor) {
- // Find the position based on the advisor's order, and add to advisors by pos
- pos = findAddSeataAdvisorPosition(advised, avr);
- advised.addAdvisor(pos, avr);
- }
- }
- PROXYED_SET.add(beanName);
- return bean;
- }
- } catch (Exception exx) {
- throw new RuntimeException(exx);
- }
- }
3、GlobalTransactionalInterceptor类
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。