赞
踩
在一些复杂的应用开发中,一个应用可能会涉及到连接多个数据源,所谓多数据源可以简单理解为至少连接两个及以上的数据库。在动手之前最好先了解对JTA有个了解,可参考:浅谈 JTA 事务
项目环境
项目依赖
pom.xml中关键依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!--JTA组件核心依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jta-atomikos</artifactId>
- </dependency>
-
- <!-- mysql -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.19</version>
- </dependency>
-
- <!-- Druid 数据源 -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid-spring-boot-starter</artifactId>
- <version>1.1.22</version>
- </dependency>
-
- <!-- mybatis plus -->
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.3.2</version>
- </dependency>
配置多数据源
application.yml
- spring:
- profiles:
- active: local
- application:
- name: jta-center
- datasource:
- type: com.alibaba.druid.pool.DruidDataSource
- test-query: SELECT 1
- min-pool-size: 5
- max-pool-size: 20
- max-life-time: 0
- dynamic:
- datasource:
- one:
- driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
- url: jdbc:mysql://127.0.0.1:3306/one?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
- username: root
- password: root
- tow:
- driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://127.0.0.1:3306/tow?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
- username: root
- password: root
配置 DataSource
创建事务数据源
- package com.jta.demo.common.dynamic;
-
- import com.baomidou.mybatisplus.core.MybatisConfiguration;
- import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
- import com.mysql.cj.jdbc.MysqlXADataSource;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
- import org.springframework.core.env.Environment;
-
- import javax.sql.DataSource;
-
- /**
- * Title: 事务数据源
- * Description:
- *
- */
- public class AtomikosDataSourceCreator {
-
- public static final String DATA_SOURCE_PREFIX = "spring.datasource.";
-
- /**
- * 创建数据源
- * <p>
- * 创建AtomikosDataSourceBean是使用Atomikos连接池的首选类
- *
- * @param environment
- * @param uniqueResourceName
- * @param dataBase
- * @return
- */
- public static AtomikosDataSourceBean createAtomikosDataSourceBean(Environment environment, String uniqueResourceName, String dataBase) {
- // 设置数据库连
- MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
-
- mysqlXaDataSource.setDatabaseName(environment.getProperty(dataBase + "name"));
- mysqlXaDataSource.setURL(environment.getProperty(dataBase + "url"));
- mysqlXaDataSource.setUser(environment.getProperty(dataBase + "username"));
- mysqlXaDataSource.setPassword(environment.getProperty(dataBase + "password"));
-
- // 事务管理器
- AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
-
- // 数据源唯一标识
- xaDataSource.setUniqueResourceName(uniqueResourceName);
- // XADataSource实现类,使用DruidXADataSource
- xaDataSource.setXaDataSourceClassName(environment.getProperty(DATA_SOURCE_PREFIX + "type"));
- // 最小连接数,默认1
- xaDataSource.setMinPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "min-pool-size", Integer.class));
- // 最大连接数,默认1
- xaDataSource.setMaxPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "max-pool-size", Integer.class));
- // 设置连接在池中被自动销毁之前保留的最大秒数。 可选,默认为0(无限制)。
- xaDataSource.setMaxLifetime(environment.getProperty(DATA_SOURCE_PREFIX + "max-life-time", Integer.class));
- // 返回连接前用于测试连接的SQL查询
- xaDataSource.setTestQuery(environment.getProperty(DATA_SOURCE_PREFIX + "test-query"));
- xaDataSource.setBorrowConnectionTimeout(60);
- xaDataSource.setXaDataSource(mysqlXaDataSource);
-
- return xaDataSource;
- }
-
-
- /**
- * 创建SqlSessionFactory实例
- */
- public static SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
- /**
- * 必须使用MybatisSqlSessionFactoryBean,
- * 不能使用SqlSessionFactoryBean,不然会报invalid bound statement (not found)
- *
- * com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory(javax.sql.DataSource)
- * 源码中也是使用MybatisSqlSessionFactoryBean
- * 并且源码中使用了@ConditionalOnMissingBean,即IOC中如果存在了SqlSessionFactory实例,mybatis-plus就不创建SqlSessionFactory实例了
- */
- MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
- sessionFactoryBean.setDataSource(dataSource);
-
- MybatisConfiguration configuration = new MybatisConfiguration();
- sessionFactoryBean.setConfiguration(configuration);
-
- return sessionFactoryBean.getObject();
- }
- }
创建 one 数据源
-
- package com.jta.demo.common.dynamic.datasource;
-
- import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
- import org.springframework.core.env.Environment;
-
- import javax.sql.DataSource;
-
-
- @Slf4j
- @Configuration
- @MapperScan(basePackages = "com.jta.demo.mappers.one.mapper", sqlSessionFactoryRef = OneDataSourcesConfiguration.SQL_SESSION_FACTORY)
- public class OneDataSourcesConfiguration {
-
- public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.one.";
-
- public static final String DATA_SOURCE_NAME = "oneDataSource";
- public static final String SQL_SESSION_FACTORY = "oneSqlSessionFactory";
-
-
- /**
- * 通过配置文件创建DataSource,一个数据库对应一个DataSource
- *
- * @param environment 环境变量,spring-boot会自动将IOC中的environment实例设置给本参数值
- * 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
- */
- @Primary
- @Bean(DATA_SOURCE_NAME)
- public DataSource dataSource(Environment environment) {
- log.info("initialize the one database...");
- return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
- }
-
- /**
- * 通过dataSource创建SqlSessionFactory
- * 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
- */
- @Primary
- @Bean(name = SQL_SESSION_FACTORY)
- public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
- return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
- }
-
- }
创建 two 数据源
- package com.jta.demo.common.dynamic.datasource;
-
- import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.core.env.Environment;
-
- import javax.sql.DataSource;
-
-
- @Slf4j
- @Configuration
- @MapperScan(basePackages = "com.jta.demo.mappers.two.mapper", sqlSessionFactoryRef = TwoDataSourcesConfiguration.SQL_SESSION_FACTORY)
- public class TwoDataSourcesConfiguration {
-
- public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.two.";
- public static final String DATA_SOURCE_NAME = "twoDataSource";
- public static final String SQL_SESSION_FACTORY = "twoSqlSessionFactory";
-
- @Bean(DATA_SOURCE_NAME)
- public DataSource dataSource(Environment environment) {
- log.info("initialize the two database...");
- return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
- }
-
- @Bean(name = SQL_SESSION_FACTORY)
- public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
- return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
- }
- }
分布式事务配置
- package com.jta.demo.common.dynamic;
-
- import com.atomikos.icatch.jta.UserTransactionImp;
- import com.atomikos.icatch.jta.UserTransactionManager;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.transaction.PlatformTransactionManager;
- import org.springframework.transaction.annotation.EnableTransactionManagement;
- import org.springframework.transaction.jta.JtaTransactionManager;
-
- import javax.transaction.TransactionManager;
- import javax.transaction.UserTransaction;
-
- /**
- * Title: 分布式事务配置
- * Description:
- */
- @Configuration
- @EnableTransactionManagement
- public class TransactionManagerConfiguration {
- /**
- * 初始化JTA事务管理器
- */
- @Bean(name = "userTransaction")
- // TODO 知识点,有兴趣可以了解以下 @SneakyThrows(Exception.class)
- // @SneakyThrows(Exception.class)
- public UserTransaction userTransaction() throws Throwable {
- UserTransactionImp userTransactionImp = new UserTransactionImp();
- userTransactionImp.setTransactionTimeout(10000);
- return userTransactionImp;
- }
-
- /**
- * 初始化Atomikos事务管理器
- */
- @Bean(name = "atomikosTransactionManager")
- public TransactionManager atomikosTransactionManager() throws Throwable {
- UserTransactionManager userTransactionManager = new UserTransactionManager();
- userTransactionManager.setForceShutdown(false);
- userTransactionManager.setTransactionTimeout(999999999);
- return userTransactionManager;
- }
-
- /**
- * 加载事务管理
- */
- @Bean(name = "transactionManager")
- public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws Throwable {
- return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
- }
- }
到此多数据源事务已经配置完成,接下来需要完成业务代码即可。
注意
在使用JTA处理多数据源事务时,在执行业务逻辑时间较长时会出现事务超时的问题。
常见下异常如下:
- Transaction 127.0.0.1.tm151796505627700002 has timed out and will rollback.
- 或
- nested exception is javax.transaction.RollbackException: Prepare: NO vote
由于JTA事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常。此时需要增加一个 jta.properties 配置文件。
修改事务默认超时时间。
- # SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
- # THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
- # UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
-
- # Required: factory implementation class of the transaction core.
- # NOTE: there is no default for this, so it MUST be specified!
- #
- com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
- #com.atomikos.icatch.max_timeout=2000
-
- # Set base name of file where messages are output
- # (also known as the 'console file').
- #
- # com.atomikos.icatch.console_file_name = tm.out
-
- # Size limit (in bytes) for the console file;
- # negative means unlimited.
- #
- # com.atomikos.icatch.console_file_limit=-1
-
- # For size-limited console files, this option
- # specifies a number of rotating files to
- # maintain.
- #
- # com.atomikos.icatch.console_file_count=1
-
- # Set the number of log writes between checkpoints
- #
- # com.atomikos.icatch.checkpoint_interval=500
-
- # Set output directory where console file and other files are to be put
- # make sure this directory exists!
- #
- # com.atomikos.icatch.output_dir = ./
-
- # Set directory of log files; make sure this directory exists!
- #
- # com.atomikos.icatch.log_base_dir = ./
-
- # Set base name of log file
- # this name will be used as the first part of
- # the system-generated log file name
- #
- # com.atomikos.icatch.log_base_name = tmlog
-
- # Set the max number of active local transactions
- # or -1 for unlimited.
- #
- # com.atomikos.icatch.max_actives = 50
-
- # Set the default timeout (in milliseconds) for local transactions
- #
- # com.atomikos.icatch.default_jta_timeout = 10000
-
- # Set the max timeout (in milliseconds) for local transactions
- #
- # com.atomikos.icatch.max_timeout = 300000
-
- # The globally unique name of this transaction manager process
- # override this value with a globally unique name
- #
- # com.atomikos.icatch.tm_unique_name = tm
-
- # Do we want to use parallel subtransactions? JTA's default
- # is NO for J2EE compatibility
- #
- #com.atomikos.icatch.serial_jta_transactions=false
-
- # If you want to do explicit resource registration then
- # you need to set this value to false.
- #
- # com.atomikos.icatch.automatic_resource_registration=true
-
- # Set this to WARN, INFO or DEBUG to control the granularity
- # of output to the console file.
- #
- # com.atomikos.icatch.console_log_level=WARN
-
- # Do you want transaction logging to be enabled or not?
- # If set to false, then no logging overhead will be done
- # at the risk of losing data after restart or crash.
- #
- # com.atomikos.icatch.enable_logging=true
-
- # Should two-phase commit be done in (multi-)threaded mode or not?
- # Set this to false if you want commits to be ordered according
- # to the order in which resources are added to the transaction.
- #
- # NOTE: threads are reused on JDK 1.5 or higher.
- # For JDK 1.4, thread reuse is enabled as soon as the
- # concurrent backport is in the classpath - see
- # http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
- #
- # com.atomikos.icatch.threaded_2pc=false
-
- # Should shutdown of the VM trigger shutdown of the transaction core too?
- #
- # com.atomikos.icatch.force_shutdown_on_vm_exit=false
-
- # 以上是完整的配置
- #
- # 配置最大的事务活动个数,-1代表无限制
- com.atomikos.icatch.max_actives = -1
-
- # 默认超时时间,单位:毫秒
- com.atomikos.icatch.default_jta_timeout = 3000000
-
- # 默认最大超时时间,单位:毫秒
- com.atomikos.icatch.max_timeout = 600000
—————————
如有不足请留言指正
相互学习,共同进步
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。