当前位置:   article > 正文

使用JTA解决多数据源事务问题_transaction 192.168.8.146.tm169672573297900013 has

transaction 192.168.8.146.tm169672573297900013 has timed out and will rollba

分享知识 传递快乐

 

在一些复杂的应用开发中,一个应用可能会涉及到连接多个数据源,所谓多数据源可以简单理解为至少连接两个及以上的数据库。在动手之前最好先了解对JTA有个了解,可参考:浅谈 JTA 事务

 

项目环境

  • springboot  2.3.1
  • mybatis plus
  • jta

项目依赖

pom.xml中关键依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <!--JTA组件核心依赖-->
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  9. </dependency>
  10. <!-- mysql -->
  11. <dependency>
  12. <groupId>mysql</groupId>
  13. <artifactId>mysql-connector-java</artifactId>
  14. <version>8.0.19</version>
  15. </dependency>
  16. <!-- Druid 数据源 -->
  17. <dependency>
  18. <groupId>com.alibaba</groupId>
  19. <artifactId>druid-spring-boot-starter</artifactId>
  20. <version>1.1.22</version>
  21. </dependency>
  22. <!-- mybatis plus -->
  23. <dependency>
  24. <groupId>com.baomidou</groupId>
  25. <artifactId>mybatis-plus-boot-starter</artifactId>
  26. <version>3.3.2</version>
  27. </dependency>

 

配置多数据源

application.yml

  1. spring:
  2. profiles:
  3. active: local
  4. application:
  5. name: jta-center
  6. datasource:
  7. type: com.alibaba.druid.pool.DruidDataSource
  8. test-query: SELECT 1
  9. min-pool-size: 5
  10. max-pool-size: 20
  11. max-life-time: 0
  12. dynamic:
  13. datasource:
  14. one:
  15. driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
  16. url: jdbc:mysql://127.0.0.1:3306/one?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
  17. username: root
  18. password: root
  19. tow:
  20. driver-class-name: com.mysql.cj.jdbc.Driver
  21. url: jdbc:mysql://127.0.0.1:3306/tow?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
  22. username: root
  23. password: root

配置 DataSource

创建事务数据源

  1. package com.jta.demo.common.dynamic;
  2. import com.baomidou.mybatisplus.core.MybatisConfiguration;
  3. import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
  4. import com.mysql.cj.jdbc.MysqlXADataSource;
  5. import org.apache.ibatis.session.SqlSessionFactory;
  6. import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
  7. import org.springframework.core.env.Environment;
  8. import javax.sql.DataSource;
  9. /**
  10. * Title: 事务数据源
  11. * Description:
  12. *
  13. */
  14. public class AtomikosDataSourceCreator {
  15. public static final String DATA_SOURCE_PREFIX = "spring.datasource.";
  16. /**
  17. * 创建数据源
  18. * <p>
  19. * 创建AtomikosDataSourceBean是使用Atomikos连接池的首选类
  20. *
  21. * @param environment
  22. * @param uniqueResourceName
  23. * @param dataBase
  24. * @return
  25. */
  26. public static AtomikosDataSourceBean createAtomikosDataSourceBean(Environment environment, String uniqueResourceName, String dataBase) {
  27. // 设置数据库连
  28. MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
  29. mysqlXaDataSource.setDatabaseName(environment.getProperty(dataBase + "name"));
  30. mysqlXaDataSource.setURL(environment.getProperty(dataBase + "url"));
  31. mysqlXaDataSource.setUser(environment.getProperty(dataBase + "username"));
  32. mysqlXaDataSource.setPassword(environment.getProperty(dataBase + "password"));
  33. // 事务管理器
  34. AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  35. // 数据源唯一标识
  36. xaDataSource.setUniqueResourceName(uniqueResourceName);
  37. // XADataSource实现类,使用DruidXADataSource
  38. xaDataSource.setXaDataSourceClassName(environment.getProperty(DATA_SOURCE_PREFIX + "type"));
  39. // 最小连接数,默认1
  40. xaDataSource.setMinPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "min-pool-size", Integer.class));
  41. // 最大连接数,默认1
  42. xaDataSource.setMaxPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "max-pool-size", Integer.class));
  43. // 设置连接在池中被自动销毁之前保留的最大秒数。 可选,默认为0(无限制)。
  44. xaDataSource.setMaxLifetime(environment.getProperty(DATA_SOURCE_PREFIX + "max-life-time", Integer.class));
  45. // 返回连接前用于测试连接的SQL查询
  46. xaDataSource.setTestQuery(environment.getProperty(DATA_SOURCE_PREFIX + "test-query"));
  47. xaDataSource.setBorrowConnectionTimeout(60);
  48. xaDataSource.setXaDataSource(mysqlXaDataSource);
  49. return xaDataSource;
  50. }
  51. /**
  52. * 创建SqlSessionFactory实例
  53. */
  54. public static SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
  55. /**
  56. * 必须使用MybatisSqlSessionFactoryBean,
  57. * 不能使用SqlSessionFactoryBean,不然会报invalid bound statement (not found)
  58. *
  59. * com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory(javax.sql.DataSource)
  60. * 源码中也是使用MybatisSqlSessionFactoryBean
  61. * 并且源码中使用了@ConditionalOnMissingBean,即IOC中如果存在了SqlSessionFactory实例,mybatis-plus就不创建SqlSessionFactory实例了
  62. */
  63. MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
  64. sessionFactoryBean.setDataSource(dataSource);
  65. MybatisConfiguration configuration = new MybatisConfiguration();
  66. sessionFactoryBean.setConfiguration(configuration);
  67. return sessionFactoryBean.getObject();
  68. }
  69. }

创建 one 数据源

  1. package com.jta.demo.common.dynamic.datasource;
  2. import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.mybatis.spring.annotation.MapperScan;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.context.annotation.Primary;
  10. import org.springframework.core.env.Environment;
  11. import javax.sql.DataSource;
  12. @Slf4j
  13. @Configuration
  14. @MapperScan(basePackages = "com.jta.demo.mappers.one.mapper", sqlSessionFactoryRef = OneDataSourcesConfiguration.SQL_SESSION_FACTORY)
  15. public class OneDataSourcesConfiguration {
  16. public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.one.";
  17. public static final String DATA_SOURCE_NAME = "oneDataSource";
  18. public static final String SQL_SESSION_FACTORY = "oneSqlSessionFactory";
  19. /**
  20. * 通过配置文件创建DataSource,一个数据库对应一个DataSource
  21. *
  22. * @param environment 环境变量,spring-boot会自动将IOC中的environment实例设置给本参数值
  23. * 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
  24. */
  25. @Primary
  26. @Bean(DATA_SOURCE_NAME)
  27. public DataSource dataSource(Environment environment) {
  28. log.info("initialize the one database...");
  29. return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
  30. }
  31. /**
  32. * 通过dataSource创建SqlSessionFactory
  33. * 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
  34. */
  35. @Primary
  36. @Bean(name = SQL_SESSION_FACTORY)
  37. public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
  38. return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
  39. }
  40. }

 

创建 two 数据源

  1. package com.jta.demo.common.dynamic.datasource;
  2. import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.mybatis.spring.annotation.MapperScan;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.core.env.Environment;
  10. import javax.sql.DataSource;
  11. @Slf4j
  12. @Configuration
  13. @MapperScan(basePackages = "com.jta.demo.mappers.two.mapper", sqlSessionFactoryRef = TwoDataSourcesConfiguration.SQL_SESSION_FACTORY)
  14. public class TwoDataSourcesConfiguration {
  15. public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.two.";
  16. public static final String DATA_SOURCE_NAME = "twoDataSource";
  17. public static final String SQL_SESSION_FACTORY = "twoSqlSessionFactory";
  18. @Bean(DATA_SOURCE_NAME)
  19. public DataSource dataSource(Environment environment) {
  20. log.info("initialize the two database...");
  21. return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
  22. }
  23. @Bean(name = SQL_SESSION_FACTORY)
  24. public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
  25. return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
  26. }
  27. }

分布式事务配置

  1. package com.jta.demo.common.dynamic;
  2. import com.atomikos.icatch.jta.UserTransactionImp;
  3. import com.atomikos.icatch.jta.UserTransactionManager;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.transaction.PlatformTransactionManager;
  8. import org.springframework.transaction.annotation.EnableTransactionManagement;
  9. import org.springframework.transaction.jta.JtaTransactionManager;
  10. import javax.transaction.TransactionManager;
  11. import javax.transaction.UserTransaction;
  12. /**
  13. * Title: 分布式事务配置
  14. * Description:
  15. */
  16. @Configuration
  17. @EnableTransactionManagement
  18. public class TransactionManagerConfiguration {
  19. /**
  20. * 初始化JTA事务管理器
  21. */
  22. @Bean(name = "userTransaction")
  23. // TODO 知识点,有兴趣可以了解以下 @SneakyThrows(Exception.class)
  24. // @SneakyThrows(Exception.class)
  25. public UserTransaction userTransaction() throws Throwable {
  26. UserTransactionImp userTransactionImp = new UserTransactionImp();
  27. userTransactionImp.setTransactionTimeout(10000);
  28. return userTransactionImp;
  29. }
  30. /**
  31. * 初始化Atomikos事务管理器
  32. */
  33. @Bean(name = "atomikosTransactionManager")
  34. public TransactionManager atomikosTransactionManager() throws Throwable {
  35. UserTransactionManager userTransactionManager = new UserTransactionManager();
  36. userTransactionManager.setForceShutdown(false);
  37. userTransactionManager.setTransactionTimeout(999999999);
  38. return userTransactionManager;
  39. }
  40. /**
  41. * 加载事务管理
  42. */
  43. @Bean(name = "transactionManager")
  44. public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws Throwable {
  45. return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
  46. }
  47. }

到此多数据源事务已经配置完成,接下来需要完成业务代码即可。

 

注意

在使用JTA处理多数据源事务时,在执行业务逻辑时间较长时会出现事务超时的问题。

常见下异常如下:

  1. Transaction 127.0.0.1.tm151796505627700002 has timed out and will rollback.
  2. nested exception is javax.transaction.RollbackException: Prepare: NO vote

由于JTA事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常。此时需要增加一个 jta.properties 配置文件。

修改事务默认超时时间。

  1. # SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
  2. # THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
  3. # UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
  4. # Required: factory implementation class of the transaction core.
  5. # NOTE: there is no default for this, so it MUST be specified!
  6. #
  7. com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
  8. #com.atomikos.icatch.max_timeout=2000
  9. # Set base name of file where messages are output
  10. # (also known as the 'console file').
  11. #
  12. # com.atomikos.icatch.console_file_name = tm.out
  13. # Size limit (in bytes) for the console file;
  14. # negative means unlimited.
  15. #
  16. # com.atomikos.icatch.console_file_limit=-1
  17. # For size-limited console files, this option
  18. # specifies a number of rotating files to
  19. # maintain.
  20. #
  21. # com.atomikos.icatch.console_file_count=1
  22. # Set the number of log writes between checkpoints
  23. #
  24. # com.atomikos.icatch.checkpoint_interval=500
  25. # Set output directory where console file and other files are to be put
  26. # make sure this directory exists!
  27. #
  28. # com.atomikos.icatch.output_dir = ./
  29. # Set directory of log files; make sure this directory exists!
  30. #
  31. # com.atomikos.icatch.log_base_dir = ./
  32. # Set base name of log file
  33. # this name will be used as the first part of
  34. # the system-generated log file name
  35. #
  36. # com.atomikos.icatch.log_base_name = tmlog
  37. # Set the max number of active local transactions
  38. # or -1 for unlimited.
  39. #
  40. # com.atomikos.icatch.max_actives = 50
  41. # Set the default timeout (in milliseconds) for local transactions
  42. #
  43. # com.atomikos.icatch.default_jta_timeout = 10000
  44. # Set the max timeout (in milliseconds) for local transactions
  45. #
  46. # com.atomikos.icatch.max_timeout = 300000
  47. # The globally unique name of this transaction manager process
  48. # override this value with a globally unique name
  49. #
  50. # com.atomikos.icatch.tm_unique_name = tm
  51. # Do we want to use parallel subtransactions? JTA's default
  52. # is NO for J2EE compatibility
  53. #
  54. #com.atomikos.icatch.serial_jta_transactions=false
  55. # If you want to do explicit resource registration then
  56. # you need to set this value to false.
  57. #
  58. # com.atomikos.icatch.automatic_resource_registration=true
  59. # Set this to WARN, INFO or DEBUG to control the granularity
  60. # of output to the console file.
  61. #
  62. # com.atomikos.icatch.console_log_level=WARN
  63. # Do you want transaction logging to be enabled or not?
  64. # If set to false, then no logging overhead will be done
  65. # at the risk of losing data after restart or crash.
  66. #
  67. # com.atomikos.icatch.enable_logging=true
  68. # Should two-phase commit be done in (multi-)threaded mode or not?
  69. # Set this to false if you want commits to be ordered according
  70. # to the order in which resources are added to the transaction.
  71. #
  72. # NOTE: threads are reused on JDK 1.5 or higher.
  73. # For JDK 1.4, thread reuse is enabled as soon as the
  74. # concurrent backport is in the classpath - see
  75. # http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
  76. #
  77. # com.atomikos.icatch.threaded_2pc=false
  78. # Should shutdown of the VM trigger shutdown of the transaction core too?
  79. #
  80. # com.atomikos.icatch.force_shutdown_on_vm_exit=false
  81. # 以上是完整的配置
  82. #
  83. # 配置最大的事务活动个数,-1代表无限制
  84. com.atomikos.icatch.max_actives = -1
  85. # 默认超时时间,单位:毫秒
  86. com.atomikos.icatch.default_jta_timeout = 3000000
  87. # 默认最大超时时间,单位:毫秒
  88. com.atomikos.icatch.max_timeout = 600000

 

 

 

 

 

 

 

 

—————————
如有不足请留言指正
相互学习,共同进步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/346918
推荐阅读
相关标签
  

闽ICP备14008679号