当前位置:   article > 正文

【十九】Spring Boot之分布式事务(JTA、Atomikos、Druid、Mybatis)_springboot 分布式事务

springboot 分布式事务

一、介绍

1.分布式、本地事务

1.本地事务:普通事务。只能保证在一个数据库上的操作ACID。

JDBC事务就是本地事务,通过connection对象管理。

2.分布式事务:两个及以上数据库源的事务(由每台数据库的本地事务组成的), 使事务可以跨越多个数据库。比如,A库的a1表和B库的b1表,在一个事务中,如果B库的b1表回滚了,A库的a1表也要回滚。

JTA事务支持分布式事务。JTA指Java事务API(Java Transaction API),它本身只是为事务管理提供了接口,Atomikos是其中一种实现。 

2.分布式事务实现方案

1.资源层的分布式事务代表JTA,钢性事务,强一致性,跟交易、结算、钱有关的,对数据一致性要求高的可以考虑用这个。

2.服务层分布式事务:代表TCC,柔性事务,弱一致性。

3.钢性事务、强一致性满足四个原则ACID:

1.原子性(Atomicity):即事务是不可分割的最小工作单元,事务内的操作要么全做,要么全不做。

2.一致性(Consistency):在事务执行前数据库的数据处于正确的状态,而事务执行完成后数据库的数据还是应该处于正确的状态,即数据完整性约束没有被破坏;如银行转帐,A转帐给B,必须保证A的钱一定转给B,一定不会出现A的钱转了但B没收到,否则数据库的数据就处于不一致(不正确)的状态。

3.隔离性(Isolation):并发事务执行之间互不影响,在一个事务内部的操作对其他事务是不产生影响,这需要事务隔离级别来指定隔离性。

4.持久性(Durability):事务一旦执行成功,它对数据库的数据的改变必须是永久的,不会因比如遇到系统故障或断电造成数据不一致或丢失。

关于事务的隔离机制,详细解释请看

【十六】Spring Boot之事务(事务传播机制、嵌套事务、事务隔离机制详解) 

4.柔性事务、弱一致性介绍

4.1.CAP理论

CAP理论:在一个分布式系统中,最多只能满足C、A、P中的2个。

CAP含义:

C:Consistency 一致性:同一数据的多个副本是否实时相同。

A:Availability 可用性:一定时间内,系统能返回一个明确的结果 则称为该系统可用。

P:Partition tolerance 分区容错性:将同一服务分布在多个系统中,从而保证某一个系统宕机,仍然有其他系统提供相同的服务。

而通常情况下,我们都必须要满足AP,所以只能牺牲C。

牺牲一致性换取可用性和分区容错性。

牺牲一致性的意思是,把强一致换成弱一致。只要数据最终能一致就好了,并不要实时一致。

4.2.BASE理论

主要就是分布式系统中最CAP怎么取舍怎么平衡的一个理论

BA:Basic Available 基本可用  一定时间内能够返回一个明确的结果。

基本可用BA和高可用HA的区别是:

1.响应时间可以更长。

2.给部分用户返回一个降级页面。返回降级页面仍然是返回明确结果。

S:Soft State:柔性状态。同一数据的不同副本的状态,不用实时一致。

E:Eventual Consisstency:最终一致性。 同一数据的不同副本的状态,不用实时一致,但一定要保证经过一定时间后最终是一致的。

二、DTP分布式事务模型

2.1 JTA(XA)协议

分布式事务的规范

XA规范主要定义了:事务管理器(Transaction Manager)资源管理器(Resource Manager)之间的接口。

XA接口是双向的系统接口。在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。

在分布式系统中,从理论上讲,两台机器无法达到一致的状态,需要引入一个单点进行协调。

XA中大致分为两个部分:

1.事务管理器(Transaction Manager),全局事务管理器。

事务管理器控制着全局事务,管理事务生命周期,并协调资源。负责各个本地资源的提交和回滚。

2.资源管理器(Resource Manager),可以理解成是一个局部的事务管理器,一个本地事务管理器或者消息队列。

 

2.2 XA支持二阶段提交 2PC

分布式事务之2pc、3pc

2.3 三阶段提交 3PC

 

分布式事务之2pc、3pc

三、Spring Boot+JTA+Atomikos+Druid+Mybatis使用

场景:现有两个不同的数据库,一个叫sid,一个叫lee。操作sid库中是账户余额表,lee库中是支出金额表。

一个比支出操作,要同时更新sid的账户余额表和lee的支出金额表。失败,两个一起回滚。

项目目录

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.sid</groupId>
  7. <artifactId>jta-atomikos</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <parent>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-parent</artifactId>
  12. <version>1.5.8.RELEASE</version>
  13. <relativePath/>
  14. </parent>
  15. <properties>
  16. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  18. <java.version>1.8</java.version>
  19. </properties>
  20. <dependencies>
  21. <!-- spring-boot的web启动的jar包 -->
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-web</artifactId>
  25. </dependency>
  26. <!-- mybatis -->
  27. <dependency>
  28. <groupId>org.mybatis.spring.boot</groupId>
  29. <artifactId>mybatis-spring-boot-starter</artifactId>
  30. <version>1.3.2</version>
  31. </dependency>
  32. <!-- mysql数据库连接包-->
  33. <dependency>
  34. <groupId>mysql</groupId>
  35. <artifactId>mysql-connector-java</artifactId>
  36. <version>5.1.38</version>
  37. </dependency>
  38. <!-- alibaba的druid数据库连接池 -->
  39. <dependency>
  40. <groupId>com.alibaba</groupId>
  41. <artifactId>druid-spring-boot-starter</artifactId>
  42. <version>1.1.9</version>
  43. </dependency>
  44. <!-- jta-atomikos 分布式事务管理 -->
  45. <dependency>
  46. <groupId>org.springframework.boot</groupId>
  47. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.projectlombok</groupId>
  51. <artifactId>lombok</artifactId>
  52. <version>1.16.14</version>
  53. </dependency>
  54. </dependencies>
  55. <build>
  56. <plugins>
  57. <plugin>
  58. <groupId>org.springframework.boot</groupId>
  59. <artifactId>spring-boot-maven-plugin</artifactId>
  60. </plugin>
  61. <!-- mybatis generator 自动生成代码插件 -->
  62. <plugin>
  63. <groupId>org.mybatis.generator</groupId>
  64. <artifactId>mybatis-generator-maven-plugin</artifactId>
  65. <version>1.3.2</version>
  66. <configuration>
  67. <configurationFile>${basedir}/src/main/resources/generator/generatorConfig.xml</configurationFile>
  68. <overwrite>true</overwrite>
  69. <verbose>true</verbose>
  70. </configuration>
  71. </plugin>
  72. </plugins>
  73. </build>
  74. </project>

application.yml

  1. server:
  2. port: 8080
  3. context-path: /sid
  4. spring:
  5. datasource:
  6. druid:
  7. one: #数据源1
  8. driver-class-name: com.mysql.jdbc.Driver
  9. url: jdbc:mysql://localhost:3306/sid
  10. username: root
  11. password: root
  12. #初始化时建立物理连接的个数
  13. initialSize: 1
  14. #池中最大连接数
  15. maxActive: 20
  16. #最小空闲连接
  17. minIdle: 1
  18. #获取连接时最大等待时间,单位毫秒
  19. maxWait: 60000
  20. #有两个含义:
  21. #1) Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。
  22. #2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
  23. timeBetweenEvictionRunsMillis: 60000
  24. #连接保持空闲而不被驱逐的最小时间,单位是毫秒
  25. minEvictableIdleTimeMillis: 300000
  26. #使用该SQL语句检查链接是否可用。如果validationQuery=null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
  27. validationQuery: SELECT 1 FROM DUAL
  28. #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
  29. testWhileIdle: true
  30. #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
  31. testOnBorrow: false
  32. #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
  33. testOnReturn: false
  34. # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
  35. filters: stat,wall,slf4j
  36. # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
  37. #connectionProperties.druid.stat.mergeSql: true
  38. #connectionProperties.druid.stat.slowSqlMillis: 5000
  39. # 合并多个DruidDataSource的监控数据
  40. #useGlobalDataSourceStat: true
  41. #default-auto-commit: true 默认
  42. #default-auto-commit: false
  43. two: #数据源2
  44. driver-class-name: com.mysql.jdbc.Driver
  45. url: jdbc:mysql://localhost:3306/lee
  46. username: root
  47. password: root
  48. #初始化时建立物理连接的个数
  49. initialSize: 1
  50. #池中最大连接数
  51. maxActive: 20
  52. #最小空闲连接
  53. minIdle: 1
  54. #获取连接时最大等待时间,单位毫秒
  55. maxWait: 60000
  56. #有两个含义:
  57. #1) Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。
  58. #2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
  59. timeBetweenEvictionRunsMillis: 60000
  60. #连接保持空闲而不被驱逐的最小时间,单位是毫秒
  61. minEvictableIdleTimeMillis: 300000
  62. #使用该SQL语句检查链接是否可用。如果validationQuery=null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
  63. validationQuery: SELECT 1 FROM DUAL
  64. #建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
  65. testWhileIdle: true
  66. #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
  67. testOnBorrow: false
  68. #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
  69. testOnReturn: false
  70. # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
  71. filters: stat,wall,slf4j
  72. # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
  73. #connectionProperties.druid.stat.mergeSql: true
  74. #connectionProperties.druid.stat.slowSqlMillis: 5000
  75. # 合并多个DruidDataSource的监控数据
  76. #useGlobalDataSourceStat: true
  77. #default-auto-commit: true 默认
  78. #default-auto-commit: false
  79. ## 该配置节点为独立的节点,不是在在spring的节点下
  80. mybatis:
  81. mapper-locations: classpath:mapping/*/*.xml #注意:一定要对应mapper映射xml文件的所在路径
  82. type-aliases-package: com.sid.model # 注意:对应实体类的路径
  83. configuration:
  84. log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #控制台打印sql

启动类

  1. @SpringBootApplication
  2. @MapperScan("com.sid.mapper.*.*")
  3. public class App {
  4. public static void main(String[] args) {
  5. SpringApplication.run(App.class, args);
  6. }
  7. }

第一个数据源配置Properties

  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "spring.datasource.druid.one")
  4. public class OneDataSourceProperties {
  5. private String driverClassName;
  6. private String url;
  7. private String username;
  8. private String password;
  9. private Integer initialSize;
  10. private Integer maxActive;
  11. private Integer minIdle;
  12. private Integer maxWait;
  13. private Integer timeBetweenEvictionRunsMillis;
  14. private Integer minEvictableIdleTimeMillis;
  15. private String validationQuery;
  16. private Boolean testWhileIdle;
  17. private Boolean testOnBorrow;
  18. private Boolean testOnReturn;
  19. private String filters;
  20. }

第二个数据源配置Properties

  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "spring.datasource.druid.two")
  4. public class TwoDataSourceProperties {
  5. private String driverClassName;
  6. private String url;
  7. private String username;
  8. private String password;
  9. private Integer initialSize;
  10. private Integer maxActive;
  11. private Integer minIdle;
  12. private Integer maxWait;
  13. private Integer timeBetweenEvictionRunsMillis;
  14. private Integer minEvictableIdleTimeMillis;
  15. private String validationQuery;
  16. private Boolean testWhileIdle;
  17. private Boolean testOnBorrow;
  18. private Boolean testOnReturn;
  19. private String filters;
  20. }

第一个数据源配置

  1. @Configuration
  2. //这里要指明这个数据适用于哪些mapper,和这个数据源的sqlsessionFactory
  3. @MapperScan(basePackages = "com.sid.mapper.sid", sqlSessionFactoryRef = "oneSqlSessionFactory")
  4. public class OneDataSourceConfiguration {
  5. @Autowired
  6. public OneDataSourceProperties oneDataSourceProperties;
  7. //配置第一个数据源
  8. @Primary
  9. @Bean(name = "oneDataSource")
  10. public DataSource oneDataSource() {
  11. // 这里datasource要使用阿里的支持XA的DruidXADataSource
  12. DruidXADataSource datasource = new DruidXADataSource();
  13. BeanUtils.copyProperties(oneDataSourceProperties,datasource);
  14. AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  15. xaDataSource.setXaDataSource(datasource);
  16. xaDataSource.setUniqueResourceName("oneDataSource");
  17. return xaDataSource;
  18. }
  19. //配置第一个sqlsessionFactory
  20. @Primary
  21. @Bean(name = "oneSqlSessionFactory")
  22. public SqlSessionFactory oneSqlSessionFactory(@Qualifier("oneDataSource") DataSource oneDataSource)
  23. throws Exception {
  24. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  25. bean.setDataSource(oneDataSource);
  26. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  27. bean.setMapperLocations(resolver.getResources("classpath:mapping/sid/*.xml"));
  28. return bean.getObject();
  29. }
  30. }

第二个数据源配置

  1. @Configuration
  2. @MapperScan(basePackages = "com.sid.mapper.lee", sqlSessionFactoryRef = "twoSqlSessionFactory")
  3. public class TwoDataSourceConfiguration {
  4. @Autowired
  5. public TwoDataSourceProperties twoDataSourceProperties;
  6. @Bean(name = "twoDataSource")
  7. public DataSource twoDataSource() {
  8. DruidXADataSource datasource = new DruidXADataSource();
  9. BeanUtils.copyProperties(twoDataSourceProperties,datasource);
  10. AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  11. xaDataSource.setXaDataSource(datasource);
  12. xaDataSource.setUniqueResourceName("twoDataSource");
  13. return xaDataSource;
  14. }
  15. @Bean(name = "twoSqlSessionFactory")
  16. public SqlSessionFactory twoSqlSessionFactory(@Qualifier("twoDataSource") DataSource twoDataSource)
  17. throws Exception {
  18. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  19. bean.setDataSource(twoDataSource);
  20. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  21. bean.setMapperLocations(resolver.getResources("classpath:mapping/lee/*.xml"));
  22. return bean.getObject();
  23. }
  24. }

第一个数据源mapper

  1. @Mapper
  2. public interface AccountMapper {
  3. int deleteByPrimaryKey(Integer id);
  4. int insert(Account record);
  5. int insertSelective(Account record);
  6. Account selectByPrimaryKey(Integer id);
  7. int updateByPrimaryKeySelective(Account record);
  8. int updateByPrimaryKey(Account record);
  9. }

mapping.xml就不贴了,就是Mybatis的插件生生了 

第二个数据源mapper

  1. @Mapper
  2. public interface ExpenditureMapper {
  3. int deleteByPrimaryKey(Integer id);
  4. int insert(Expenditure record);
  5. int insertSelective(Expenditure record);
  6. Expenditure selectByPrimaryKey(Integer id);
  7. int updateByPrimaryKeySelective(Expenditure record);
  8. int updateByPrimaryKey(Expenditure record);
  9. }

model类

  1. public class Account {
  2. private Integer id;
  3. private BigDecimal accountBalance;
  4. public Integer getId() {
  5. return id;
  6. }
  7. public void setId(Integer id) {
  8. this.id = id;
  9. }
  10. public BigDecimal getAccountBalance() {
  11. return accountBalance;
  12. }
  13. public void setAccountBalance(BigDecimal accountBalance) {
  14. this.accountBalance = accountBalance;
  15. }
  16. }
  1. public class Expenditure {
  2. private Integer id;
  3. private BigDecimal money;
  4. public Integer getId() {
  5. return id;
  6. }
  7. public void setId(Integer id) {
  8. this.id = id;
  9. }
  10. public BigDecimal getMoney() {
  11. return money;
  12. }
  13. public void setMoney(BigDecimal money) {
  14. this.money = money;
  15. }
  16. }

service层使用事务回滚演示

  1. @Service
  2. public class TestServiceImpl implements TestService {
  3. @Resource
  4. private AccountMapper accountMapper;
  5. @Resource
  6. private ExpenditureMapper expenditureMapper;
  7. @Override
  8. @Transactional
  9. public String testJtaAtomikos(){
  10. Account account = new Account();
  11. account.setAccountBalance(new BigDecimal(560.56));
  12. accountMapper.insertSelective(account);
  13. Expenditure expenditure = new Expenditure();
  14. expenditure.setMoney(new BigDecimal(3.3));
  15. expenditureMapper.insertSelective(expenditure);
  16. int i = 1 / 0;
  17. return "done";
  18. }
  19. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/995389
推荐阅读
相关标签
  

闽ICP备14008679号