当前位置:   article > 正文

【巨人的肩膀】Spring事务详解_propagation.not_supported

propagation.not_supported
引子

很多 coder 在不理解事务的原理甚至连基本概念都不清楚的情况下,就去使用数据库事务,是极容易出错,写出一些自己不能掌控的代码。网上很多文章要不就是概念,或者一点源码,或者一点测试验证,都不足以全面了解事务,所以本文出现了

全文基于 Mysql innodb 引擎。Mysql 官方文档 ,推荐书籍:《Mysql技术内幕-InnoDB存储引擎

1. 拜神

Spring 事务领头人叫 Juergen Hoeller,于尔根·糊了…先混个脸熟哈,他写了几乎全部的 spring 事务代码。读源码先拜神,掌握他的源码的风格,读起来会通畅很多。最后一节咱们总结下这个大神的代码风格
在这里插入图片描述

2. 初探事务
2.1 事务的定义

事务(Transaction)是数据库区别于文件系统的重要特性之一。目前国际认可的数据库设计原则是 ACID 特性,用以保证数据库事务的正确执行。Mysql 的 innodb 引擎中的事务就完全符合 ACID 特性

Spring 对于事务的支持,分层概览图如下
在这里插入图片描述

2.2 事务的ACID特性
  1. 原子性(Atomicity):一个事务必须被视为一个不可分割的最小工作单元,整个事务中的所有操作要么全部提交成功,要么全部失败回滚
    主要涉及 InnoDB 事务。相关特性:事务的提交,回滚,信息表

  2. 一致性(consistency):数据库总是从一个一致性的状态转换到另一个一致性的状态。在事务开始前后,数据库的完整性约束没有被破坏。例如违反了唯一性,必须撤销事务,返回初始状态
    主要涉及内部 InnoDB 处理,以保护数据不受崩溃,相关特性:双写缓冲、崩溃恢复

  3. 隔离性(isolation):每个读写事务的对象对其他事务的操作对象能相互分离,即:事务提交前对其他事务是不可见的,通常内部加锁实现
    主要涉及事务,尤其是事务隔离级别,相关特性:隔离级别、innodb 锁的底层实现细节

  4. 持久性(durability):一旦事务提交,则其所做的修改会永久保存到数据库
    涉及到 MySQL 软件特性与特定硬件配置的相互影响,相关特性:4个配置项:双写缓冲开关、事务提交刷新 log 的级别、binlog 同步频率、表文件;写缓存、操作系统对于 fsync() 的支持、备份策略等

2.3 事务的属性

要保证事务的 ACID 特性,spring 给事务定义了 6 个属性,对应于声明式事务注解(org.springframework.transaction.annotation.Transactional)@Transactional(key1=,key2=…)

事务名称:用户可手动指定事务的名称,当多个事务的时候,可区分使用哪个事务。对应注解中的属性 value、transactionManager
隔离级别: 为了解决数据库容易出现的问题,分级加锁处理策略。 对应注解中的属性 isolation
超时时间: 定义一个事务执行过程多久算超时,以便超时后回滚。可以防止长期运行的事务占用资源。对应注解中的属性 timeout
是否只读:表示这个事务只读取数据但不更新数据,这样可以帮助数据库引擎优化事务。对应注解中的属性 readOnly
传播机制: 对事务的传播特性进行定义,共有7种类型。对应注解中的属性 propagation
回滚机制:定义遇到异常时回滚策略。对应注解中的属性 rollbackFor、noRollbackFor、rollbackForClassName、noRollbackForClassName
其中隔离级别和传播机制比较复杂,咱们细细地品一品

2.3.1 隔离级别

这一块比较复杂,我们从3个角度来看:3种错误现象、mysql的底层技术支持、分级处理策略。这一小节一定要好好看,已经开始涉及核心原理了

  1. 现象(三种问题)
    **脏读(Drity Read):**事务A更新记录但未提交,事务B查询出A未提交记录
    不可重复读(Non-repeatable read): 事务A读取一次,此时事务B对数据进行了更新或删除操作,事务A再次查询数据不一致
    幻读(Phantom Read): 事务A读取一次,此时事务B插入一条数据事务A再次查询,记录多了

  2. mysql的底层支持(IndoDB事务模型)(一致性非锁定读VS锁定读)

官网飞机票:InnoDB Transaction Model

两种读:在 MVCC 中,读操作可以分成两类,快照读(Snapshot read)和当前读(current read)。

快照读:
普通的select
当前读:
select * from table where ? lock in share mode; (加S锁)
select * from table where ? for update; (加X锁)
insert, update, delete 操作前会先进行一次当前读(加X锁)
其中前两种锁定读,需要用户自己显式使用,最后一种是自动添加的

一致性非锁定读(快照读-consistent nonlocking read):是指 InnoDB 存储引擎通过多版本控制(multi versionning)的方式来读取当前执行时间数据库中行的数据,如果读取的行正在执行 DELETE 或 UPDAT 操作,这是读取操作不会因此等待行上锁的释放。相反的,InnoDB 会去读取行的一个快照数据
在这里插入图片描述
上面展示了 InnoDB 存储引擎一致性的非锁定读。之所以称为非锁定读,因为不需要等待访问的行上 X 锁的释放。快照数据是指该行之前版本的数据,该实现是通过 undo 段来完成。而 undo 用来事务中的回滚数据,因此快照数据本身没有额外的开销,此外,读取快照数据不需要上锁,因为没有事务需要对历史数据进行修改操作

锁定读(当前读):InnoDB 对 select语 句支持两种锁定读:(1):SELECT…FOR UPDATE 对读取的行加排它锁(X锁),其他事务不能对已锁定的行再加任何锁;(2):SELECT…LOCK IN SHARE MODE 对读取的行加共享锁(S锁),其他事务可以再加 S 锁,X 锁会阻塞等待

注:这两种锁都必须处于事务中,事务 commit,锁释放。所以必须 begin 或者 start transaction 开启一个事务或者索性 set autocommit=0 把自动提交关掉(mysql默认是1,即执行完sql立即提交)

  1. 分级处理策略(四种隔离级别)

官网描述:InnoDB 使用不同的锁定策略支持每个事务隔离级别。对于关键数据的操作(遵从ACID原则),您可以使用强一致性(默认Repeatable Read)。对于不是那么重要的数据操作,可以使用 Read Committed/Read Uncommitted。Serializable 执行比可重读更严格的规则,用于特殊场景:XA事务,并发性和死锁问题的故障排除

四种隔离级别:
1. Read Uncommitted(读取未提交内容):可能读取其它事务未提交的数据。不能解决:脏读、不可重复读、幻读
2. Read Committed(读取提交内容):一个事务只能看见已经提交事务所做的改变。不能解决:不可重复读、幻读
select…from : 一致性非锁定读的数据快照(MVCC)是最新版本的,但其他事务可能会有新的 commit,所以同一 select 可能返回不同结果
select…from for update : record lock行级锁
3. Repeatable Read(可重读):不能解决:幻读
select…from:同一事务内多次一致性非锁定读,取第一次读取时建立的快照版本(MVCC),保证了同一事务内部的可重复读.—狭义的幻读问题得到解决。(Db插入了数据,只不过读不到)
select…from for update (FOR UPDATE or LOCK IN SHARE MODE), UPDATE, 和 DELETE : next-key lock下一键锁
对于具有唯一搜索条件的唯一索引,InnoDB 只锁定找到的索引记录(next-key lock 降为record lock)
对于其他非索引或者非唯一索引,InnoDB 会对扫描的索引范围进行锁定,使用 next-key locks,阻塞其他session 对间隙的 insert 操作,-彻底解决广义的幻读问题。(DB没插入数据)
4. Serializable(可串行化):这是最高的隔离级别,它是在每个读的数据行上加上共享锁(LOCK IN SHARE MODE)。在这个级别,可能导致大量的超时现象和锁竞争,主要用于分布式事务
在这里插入图片描述

2.3.2 传播机制

org.springframework.transaction 包下有一个事务定义接口 TransactionDefinition,定义了 7 种事务传播机制,很多人对传播机制的曲解从概念开始,所以特地翻译了一下源码注释如下:
1. PROPAGATION_REQUIRED:支持当前事务;如果不存在,创建一个新的。类似于同名的 EJB 事务属性。这通常是事务定义的默认设置,通常定义事务同步作用域。存在一个事务:1.外部有事务加入,异常回滚;2.外部没事务创建新事务,异常回滚
2. PROPAGATION_SUPPORTS:支持当前事务:如果不存在事务,则以非事务方式执行。类似于同名的EJB事务属性。注意:对于具有事务同步的事务管理器,PROPAGATION_SUPPORTS 与没有事务稍有不同,因为它可能在事务范围内定义了同步。因此,相同的资源(JDBC的Connection、Hibernate的Session等)将在整个指定范围内共享。注意,确切的行为取决于事务管理器的实际同步配置!
小心使用 PROPAGATION_SUPPORTS! 特别是,不要依赖 PROPAGATION_REQUIRED 或 PROPAGATION_REQUIRES_NEW,在 PROPAGATION_SUPPORTS 范围内(这可能导致运行时的同步冲突)。如果这种嵌套不可避免,请确保适当地配置事务管理器(通常切换到“实际事务上的同步”)。最多只存在一个事务: 1.外部有事务加入,异常回滚;2.外部没事务,内部非事务,异常不回滚
3. PROPAGATION_MANDATORY:支持当前事务;如果当前事务不存在,抛出异常。类似于同名的EJB事务属性。注意:PROPAGATION_MANDATORY范围内的事务同步总是由周围的事务驱动。最多只存在一个事务: 1.外部存在事务加入,异常回滚;2.外部不存在事务,异常无法回滚
4. PROPAGATION_REQUIRES_NEW:创建一个新事务,如果存在当前事务,则挂起当前事务。类似于同名的 EJB 事务属性。注意:实际事务挂起不会在所有事务管理器上开箱即用。这一点特别适用于 JtaTransactionManager,它需要 TransactionManager 的支持。
PROPAGATION_REQUIRES_NEW 范围总是定义自己的事务同步。现有同步将被挂起并适当地恢复。可能存在1-2个事务:1.外部存在事务挂起,创建新,异常回滚自己的事务 2.外部不存在事务,创建新, 异常只回滚新事务
5. PROPAGATION_NOT_SUPPORTED:不支持当前事务,存在事务挂起当前事务;始终以非事务方式执行。类似于同名的 EJB 事务属性。注意:实际事务挂起不会在所有事务管理器上开箱即用。这一点特别适用于 JtaTransactionManager,它需要 TransactionManager 的支持。事务同步在 PROPAGATION_NOT_SUPPORTED 范围内是不可用的。现有同步将被挂起并适当地恢复。最多只存在一个事务:1. 外部有事务挂起,外部异常回滚;内部非事务,异常不回滚2.外部无事务,内部非事务,异常不回滚
6. PROPAGATION_NEVER:不支持当前事务;如果当前事务存在,抛出异常。类似于同名的 EJB 事务属性。注意:事务同步在 PROPAGATION_NEVER 范围内不可用。最多只存在一个事务:1.外部有事务,外部异常回滚;内部非事务不回滚 2.外部非事务,内部非事务,异常不回滚
7. PROPAGATION_NESTED:如果当前事务存在,则在嵌套事务中执行,如果当前没有事务,类似 PROPAGATION_REQUIRED(创建一个新的)。EJB 中没有类似的功能。注意:实际创建嵌套事务只对特定的事务管理器有效。开箱即用,这只适用于 DataSourceTransactionManager(JDBC 3.0驱动)。一些JTA提供者也可能支持嵌套事务。存在一个事务:1. 外部有事务,嵌套事务创建保存点,外部异常回滚全部事务;内部嵌套事务异常回滚到保存点;2.外部不存在事务,内部创建新事务,内部异常回滚

3 简单样例

在 Spring 中,事务有两种实现方式:

  1. 编程式事务管理: 编程式事务管理使用底层源码可实现更细粒度的事务控制。Spring 推荐使用 TransactionTemplate,典型的模板模式
  2. 申明式事务管理: 添加 @Transactional 注解,并定义传播机制+回滚策略。基于 Spring AOP 实现,本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务
3.1 简单样例

**需求:**创建用户时,新建一个用户余额表。如果用户余额创建失败抛出异常,那么用户表也回滚,即要保证 “新增用户+新增用户余额” 一起成功或回滚

3.2 申明式事务管理

如下图,只需要在 service.impl 层,业务方法上添加 @Transactional 注解,定义事务的传播机制为 REQUIRED(不写这个参数,默认就是REQUIRED),遇到 Exception 异常就一起回滚

REQUIRED 传播机制下:存在加入事务,不存在创建新事务。保证了当前方法中的所有数据库操作都在一个物理事务中,当遇到异常时会整个业务方法一起回滚

 /**
     * 创建用户并创建账户余额
      *
      * @param name
      * @param balance
      * @return
      */
     @Transactional(propagation= Propagation.REQUIRED, rollbackFor = Exception.class)
     @Override
     public void addUserBalanceAndUser(String name, BigDecimal balance) {
         log.info("[addUserBalanceAndUser] begin!!!");
         //1.新增用户
         userService.addUser(name);
         //2.新增用户余额
         UserBalance userBalance = new UserBalance();
         userBalance.setName(name);
         userBalance.setBalance(new BigDecimal(1000));
         this.addUserBalance(userBalance);
         log.info("[addUserBalanceAndUser] end!!!");
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
3.3 编程式事务管理

编程式事务管理,我们使用 Spring 推荐的 transactionTemplate。我这里因为使用的是 spring cloud 的注解配置,实现用了自动配置类配置好了 TransactionTemplate 这个类型的 bean。使用的时候直接注入 bean 使用即可(当然老式的xml配置也是一样的)。如下

 /**
      * 创建用户并创建账户余额(手动事务,不带结果)
      *
      * @param name
      * @param balance
      * @return
      */
     @Override
     public void addUserBalanceAndUserWithinTT(String name, BigDecimal balance) {
         //实现一个没有返回值的事务回调
         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
             @Override
             protected void doInTransactionWithoutResult(TransactionStatus status) {
                 try {
                     log.info("[addUserBalanceAndUser] begin!!!");
 
                     //1.新增用户
                     userService.addUser(name);
                     //2.新增用户余额
                     UserBalance userBalance = new UserBalance();
                     userBalance.setName(name);
                     userBalance.setBalance(new BigDecimal(1000));
                     userBalanceRepository.insert(userBalance);
                     log.info("[addUserBalanceAndUser] end!!!");
                     //注意:这里catch住异常后,设置setRollbackOnly,否则事务不会滚。当然如果不需要自行处理异常,就不要catch了
                 } catch (Exception e) {
                     // 异常回滚
                     status.setRollbackOnly();
                     log.error("异常回滚!,e={}",e);
                 }
 
             }
         });
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

注意:

  1. 可以不用 try catch,transactionTemplate.execute 自己会捕捉异常并回滚
  2. 如果有业务异常需要特殊处理,记得:status.setRollbackOnly(); 标识为回滚
4. 源码详解
4.1 编程式事务TransactionTemplate

编程式事务,Spring 已经给我们提供好了模板类 TransactionTemplate,可以很方便的使用,如下图
在这里插入图片描述
TransactionTemplate 全路径名是:org.springframework.transaction.support.TransactionTemplate。看包名也知道了这是 spring 对事务的模板类。(spring动不动就是各种Template…),看下类图先
在这里插入图片描述
一看,哟西,实现了 TransactionOperations、InitializingBean 这2个接口(熟悉 Spring 源码的知道这个 InitializingBean 又是老套路),我们来看下接口源码如下

 public interface TransactionOperations {
 
     /**
      * Execute the action specified by the given callback object within a transaction.
      * <p>Allows for returning a result object created within the transaction, that is,
      * a domain object or a collection of domain objects. A RuntimeException thrown
      * by the callback is treated as a fatal exception that enforces a rollback.
      * Such an exception gets propagated to the caller of the template.
      * @param action the callback object that specifies the transactional action
      * @return a result object returned by the callback, or {@code null} if none
      * @throws TransactionException in case of initialization, rollback, or system errors
      * @throws RuntimeException if thrown by the TransactionCallback
      */
     <T> T execute(TransactionCallback<T> action) throws TransactionException;
 
 }
 
 public interface InitializingBean {
 
     /**
      * Invoked by a BeanFactory after it has set all bean properties supplied
      * (and satisfied BeanFactoryAware and ApplicationContextAware).
      * <p>This method allows the bean instance to perform initialization only
      * possible when all bean properties have been set and to throw an
      * exception in the event of misconfiguration.
      * @throws Exception in the event of misconfiguration (such
      * as failure to set an essential property) or if initialization fails.
      */
     void afterPropertiesSet() throws Exception;
 
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

如上图,TransactionOperations 这个接口用来执行事务的回调方法,InitializingBean 这个是典型的 spring bean 初始化流程中(飞机票:Spring IOC(四)总结升华篇)的预留接口,专用用来在 bean 属性加载完毕时执行的方法

回到正题,TransactionTemplate 的 2 个接口的 impl 方法做了什么?

     @Override
     public void afterPropertiesSet() {
         if (this.transactionManager == null) {
             throw new IllegalArgumentException("Property 'transactionManager' is required");
         }
     }
 
 
     @Override
     public <T> T execute(TransactionCallback<T> action) throws TransactionException {
       // 内部封装好的事务管理器
         if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
             return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
         }// 需要手动获取事务,执行方法,提交事务的管理器
         else {// 1.获取事务状态
             TransactionStatus status = this.transactionManager.getTransaction(this);
             T result;
             try {// 2.执行业务逻辑
                 result = action.doInTransaction(status);
             }
             catch (RuntimeException ex) {
                 // 应用运行时异常 -> 回滚
                 rollbackOnException(status, ex);
                 throw ex;
             }
             catch (Error err) {
                 // Error异常 -> 回滚
                 rollbackOnException(status, err);
                 throw err;
             }
             catch (Throwable ex) {
                 // 未知异常 -> 回滚
                 rollbackOnException(status, ex);
                 throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
             }// 3.事务提交
             this.transactionManager.commit(status);
             return result;
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

如上图所示,实际上 afterPropertiesSet 只是校验了事务管理器不为空,execute() 才是核心方法,execute 主要步骤:

  1. getTransaction() 获取事务
  2. doInTransaction() 执行业务逻辑,这里就是用户自定义的业务代码。如果是没有返回值的,就是 doInTransactionWithoutResult()
  3. commit() 事务提交:调用 AbstractPlatformTransactionManager 的 commit,rollbackOnException() 异常回滚:调用 AbstractPlatformTransactionManager 的 rollback(),事务提交回滚
4.2 申明式事务@Transactional
4.2.1 AOP相关概念

申明式事务使用的是 Spring AOP,即面向切面编程
AOP核心概念如下:
1. 通知(Advice):定义了切面(各处业务代码中都需要的逻辑提炼成的一个切面)做什么 what+when 何时使用。例如:前置通知Before、后置通知After、返回通知After-returning、异常通知After-throwing、环绕通知Around
2. 连接点(Joint point):程序执行过程中能够插入切面的点,一般有多个。比如调用方式时、抛出异常时
3. 切点(Pointcut):切点定义了连接点,切点包含多个连接点,即 where 哪里使用通知.通常指定类+方法 或者 正则表达式来匹配 类和方法名称
4. 切面(Aspect):切面=通知+切点,即 when+where+what 何时何地做什么
5. 引入(Introduction):允许我们向现有的类添加新方法或属性
6. 织入(Weaving):织入是把切面应用到目标对象并创建新的代理对象的过程

4.2.2 申明式事务

申明式事务整体调用过程,可以抽出2条线:

  1. 使用代理模式,生成代理增强类
  2. 根据代理事务管理配置类,配置事务的织入,在业务方法前后进行环绕增强,增加一些事务的相关操作。例如获取事务属性、提交事务、回滚事务

过程如下图:
在这里插入图片描述
申明式事务使用 @Transactional 这种注解的方式,那么我们就从 Springboot 容器启动时的自动配置载入(spring boot容器启动详解)开始看。在 /META-INF/spring.factories 中配置文件中查找,如下图
在这里插入图片描述
载入2个关于事务的自动配置类:

org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
org.springframework.boot.autoconfigure.transaction.jta.JtaAutoConfiguration,
  • 1
  • 2

jta 咱们就不看了,看一下 TransactionAutoConfiguration 这个自动配置类:

@Configuration
@ConditionalOnClass(PlatformTransactionManager.class)
@AutoConfigureAfter({ JtaAutoConfiguration.class, 
        HibernateJpaAutoConfiguration.class,
        DataSourceTransactionManagerAutoConfiguration.class,
        Neo4jDataAutoConfiguration.class })
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public TransactionManagerCustomizers platformTransactionManagerCustomizers(
           ObjectProvider<List<PlatformTransactionManagerCustomizer<?>>> customizers) {
        return new TransactionManagerCustomizers(customizers.getIfAvailable());
    }

    @Configuration
    @ConditionalOnSingleCandidate(PlatformTransactionManager.class)
    public static class TransactionTemplateConfiguration {

        private final PlatformTransactionManager transactionManager;

        public TransactionTemplateConfiguration(
               PlatformTransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }

        @Bean
        @ConditionalOnMissingBean
        public TransactionTemplate transactionTemplate() {
            return new TransactionTemplate(this.transactionManager);
        }
    }

    @Configuration
    @ConditionalOnBean(PlatformTransactionManager.class)
    @ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
    public static class EnableTransactionManagementConfiguration {

        @Configuration
        @EnableTransactionManagement(proxyTargetClass = false)
        @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false", matchIfMissing = false)
        public static class JdkDynamicAutoProxyConfiguration {

        }
 
        @Configuration
        @EnableTransactionManagement(proxyTargetClass = true)
        @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true", matchIfMissing = true)
        public static class CglibAutoProxyConfiguration {
 
        }
 
    }
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

TransactionAutoConfiguration 这个类主要看:

  1. 2个类注解
    @ConditionalOnClass(PlatformTransactionManager.class)即类路径下包含 PlatformTransactionManager 这个类时这个自动配置生效,这个类是 spring 事务的核心包,肯定引入了
    @AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class, DataSourceTransactionManagerAutoConfiguration.class, Neo4jDataAutoConfiguration.class }),这个配置在括号中的 4 个配置类后才生效

  2. 2个内部类
    TransactionTemplateConfiguration 事务模板配置类:
    @ConditionalOnSingleCandidate(PlatformTransactionManager.class) 当能够唯一确定一个 PlatformTransactionManager bean 时才生效
    @ConditionalOnMissingBean 如果没有定义 TransactionTemplate bean 生成一个

EnableTransactionManagementConfiguration 开启事务管理器配置类:
@ConditionalOnBean(PlatformTransactionManager.class) 当存在 PlatformTransactionManager bean 时生效
@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class) 当没有自定义抽象事务管理器配置类时才生效。(即用户自定义抽象事务管理器配置类会优先,如果没有,就用这个默认事务管理器配置类)

EnableTransactionManagementConfiguration支持2种代理方式:
1. JdkDynamicAutoProxyConfiguration:
1. @EnableTransactionManagement(proxyTargetClass = false),即 proxyTargetClass = false 表示是 JDK 动态代理支持的是:面向接口代理
2. @ConditionalOnProperty(prefix = “spring.aop”, name = “proxy-target-class”, havingValue = “false”, matchIfMissing = false),即 spring.aop.proxy-target-class=false 时生效,且没有这个配置不生效
2. CglibAutoProxyConfiguration:
1. @EnableTransactionManagement(proxyTargetClass = true),即 proxyTargetClass = true 标识 Cglib 代理支持的是子类继承代理
2. @ConditionalOnProperty(prefix = “spring.aop”, name = “proxy-target-class”, havingValue = “true”, matchIfMissing = true),即 spring.aop.proxy-target-class=true 时生效,且没有这个配置默认生效

注意了,默认没有配置,走的 Cglib 代理。说明 @Transactional 注解支持直接加在类上
好吧,看了这么多配置类,终于到了 @EnableTransactionManagement 这个注解了

 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
 @Documented
 @Import(TransactionManagementConfigurationSelector.class)
 public @interface EnableTransactionManagement {
 
     //proxyTargetClass = false表示是JDK动态代理支持接口代理。true表示是Cglib代理支持子类继承代理。
     boolean proxyTargetClass() default false;
 
     //事务通知模式(切面织入方式),默认代理模式(同一个类中方法互相调用拦截器不会生效),可以选择增强型AspectJ
     AdviceMode mode() default AdviceMode.PROXY;
 
     //连接点上有多个通知时,排序,默认最低。值越大优先级越低。
     int order() default Ordered.LOWEST_PRECEDENCE;
 
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

重点看类注解 @Import(TransactionManagementConfigurationSelector.class)

TransactionManagementConfigurationSelector类图如下:
在这里插入图片描述
如上图所示,TransactionManagementConfigurationSelector 继承自 AdviceModeImportSelector 实现了 ImportSelector 接口

 public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
 
     /**
      * {@inheritDoc}
      * @return {@link ProxyTransactionManagementConfiguration} or
      * {@code AspectJTransactionManagementConfiguration} for {@code PROXY} and
      * {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()}, respectively
      */
     @Override
     protected String[] selectImports(AdviceMode adviceMode) {
         switch (adviceMode) {
             case PROXY:
                 return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
             case ASPECTJ:
                 return new String[] {TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
             default:
                 return null;
         }
     }
 
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

如上图,最终会执行 selectImports 方法导入需要加载的类,我们只看 proxy 模式下,载入了 AutoProxyRegistrar、ProxyTransactionManagementConfiguration 2个类
AutoProxyRegistrar:给容器中注册一个 InfrastructureAdvisorAutoProxyCreator 组件;利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行调用
ProxyTransactionManagementConfiguration:就是一个配置类,定义了事务增强器

  • AutoProxyRegistrar
    先看 AutoProxyRegistrar 实现了 ImportBeanDefinitionRegistrar 接口,复写 registerBeanDefinitions 方法,源码如下
 public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
         boolean candidateFound = false;
         Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
         for (String annoType : annoTypes) {
             AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
             if (candidate == null) {
                 continue;
             }
             Object mode = candidate.get("mode");
             Object proxyTargetClass = candidate.get("proxyTargetClass");
             if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                     Boolean.class == proxyTargetClass.getClass()) {
                 candidateFound = true;
                 if (mode == AdviceMode.PROXY) {//代理模式
                     AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                     if ((Boolean) proxyTargetClass) {//如果是CGLOB子类代理模式
                         AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                         return;
                     }
                 }
             }
         }
         if (!candidateFound) {
             String name = getClass().getSimpleName();
             logger.warn(String.format("%s was imported but no annotations were found " +
                     "having both 'mode' and 'proxyTargetClass' attributes of type " +
                     "AdviceMode and boolean respectively. This means that auto proxy " +
                     "creator registration and configuration may not have occurred as " +
                     "intended, and components may not be proxied as expected. Check to " +
                     "ensure that %s has been @Import'ed on the same class where these " +
                     "annotations are declared; otherwise remove the import of %s " +
                     "altogether.", name, name, name));
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

代理模式:AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);

最终调用的是:registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);基础构建增强自动代理构造器

 private static BeanDefinition registerOrEscalateApcAsRequired(Class<?> cls, BeanDefinitionRegistry registry, Object source) {
         Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
     //如果当前注册器包含internalAutoProxyCreator
         if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {//org.springframework.aop.config.internalAutoProxyCreator内部自动代理构造器
             BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
             if (!cls.getName().equals(apcDefinition.getBeanClassName())) {//如果当前类不是internalAutoProxyCreator
                 int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
                 int requiredPriority = findPriorityForClass(cls);
                 if (currentPriority < requiredPriority) {//如果下标大于已存在的内部自动代理构造器,index越小,优先级越高,InfrastructureAdvisorAutoProxyCreator index=0,requiredPriority最小,不进入
                     apcDefinition.setBeanClassName(cls.getName());
                 }
             }
             return null;//直接返回
         }//如果当前注册器不包含internalAutoProxyCreator,则把当前类作为根定义
         RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
         beanDefinition.setSource(source);
         beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);//优先级最高
         beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
         registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
         return beanDefinition;
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

如上图,APC_PRIORITY_LIST 列表如下图:

 /**
      * Stores the auto proxy creator classes in escalation order.
      */
     private static final List<Class<?>> APC_PRIORITY_LIST = new ArrayList<Class<?>>();
 
     /**
      * 优先级上升list
      */
     static {
         APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class);
         APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class);
         APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

如上图,由于 InfrastructureAdvisorAutoProxyCreator 这个类在 list 中第一个 index=0,requiredPriority 最小,不进入,所以没有重置 beanClassName,啥都没做,返回 null
那么增强代理类何时生成呢?
InfrastructureAdvisorAutoProxyCreator 类图如下:
在这里插入图片描述
如上图所示,看2个核心方法:InstantiationAwareBeanPostProcessor 接口的 postProcessBeforeInstantiation 实例化前+BeanPostProcessor 接口的 postProcessAfterInitialization 初始化后

     @Override
     public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
         Object cacheKey = getCacheKey(beanClass, beanName);
 
         if (beanName == null || !this.targetSourcedBeans.contains(beanName)) {
             if (this.advisedBeans.containsKey(cacheKey)) {//如果已经存在直接返回
                 return null;
             }//是否基础构件(基础构建不需要代理):Advice、Pointcut、Advisor、AopInfrastructureBean这四类都算基础构建
             if (isInfrastructureClass(beanClass) || shouldSkip(beanClass, beanName)) {
                 this.advisedBeans.put(cacheKey, Boolean.FALSE);//添加进advisedBeans ConcurrentHashMap<k=Object,v=Boolean>标记是否需要增强实现,这里基础构建bean不需要代理,都置为false,供后面postProcessAfterInitialization实例化后使用。
                 return null;
             }
         }
 
         // TargetSource是spring aop预留给我们用户自定义实例化的接口,如果存在TargetSource就不会默认实例化,而是按照用户自定义的方式实例化,咱们没有定义,不进入
         if (beanName != null) {
             TargetSource targetSource = getCustomTargetSource(beanClass, beanName);
             if (targetSource != null) {
                 this.targetSourcedBeans.add(beanName);
                 Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(beanClass, beanName, targetSource);
                 Object proxy = createProxy(beanClass, beanName, specificInterceptors, targetSource);
                 this.proxyTypes.put(cacheKey, proxy.getClass());
                 return proxy;
             }
         }
 
         return null;
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

通过追踪,由于 InfrastructureAdvisorAutoProxyCreator 是基础构建类,

advisedBeans.put(cacheKey, Boolean.FALSE)

添加进 advisedBeans ConcurrentHashMap<k=Object,v=Boolean> 标记是否需要增强实现,这里基础构建 bean 不需要代理,都置为 false,供后面 postProcessAfterInitialization 实例化后使用

我们再看 postProcessAfterInitialization 源码如下:

     @Override
     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
         if (bean != null) {
             Object cacheKey = getCacheKey(bean.getClass(), beanName);
             if (!this.earlyProxyReferences.contains(cacheKey)) {
                 return wrapIfNecessary(bean, beanName, cacheKey);
             }
         }
         return bean;
     }
 
     protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
       // 如果是用户自定义获取实例,不需要增强处理,直接返回
         if (beanName != null && this.targetSourcedBeans.contains(beanName)) {
             return bean;
         }// 查询map缓存,标记过false,不需要增强直接返回
         if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
             return bean;
         }// 判断一遍springAOP基础构建类,标记过false,不需要增强直接返回
         if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
             this.advisedBeans.put(cacheKey, Boolean.FALSE);
             return bean;
         }
 
         // 获取增强List<Advisor> advisors
         Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
       // 如果存在增强
         if (specificInterceptors != DO_NOT_PROXY) {
             this.advisedBeans.put(cacheKey, Boolean.TRUE);// 标记增强为TRUE,表示需要增强实现
          // 生成增强代理类
             Object proxy = createProxy(
                     bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
             this.proxyTypes.put(cacheKey, proxy.getClass());
             return proxy;
         }
      // 如果不存在增强,标记false,作为缓存,再次进入提高效率,第16行利用缓存先校验
         this.advisedBeans.put(cacheKey, Boolean.FALSE);
         return bean;
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

下面看核心方法 createProxy 如下:

  • ProxyTransactionManagementConfiguration
 @Configuration
 public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
 
     @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)//定义事务增强器
     public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
         BeanFactoryTransactionAttributeSourceAdvisor j = new BeanFactoryTransactionAttributeSourceAdvisor();
         advisor.setTransactionAttributeSource(transactionAttributeSource());
         advisor.setAdvice(transactionInterceptor());
         advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
         return advisor;
     }
 
     @Bean
     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)//定义基于注解的事务属性资源
     public TransactionAttributeSource transactionAttributeSource() {
         return new AnnotationTransactionAttributeSource();
     }
 
     @Bean
     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)//定义事务拦截器
     public TransactionInterceptor transactionInterceptor() {
         TransactionInterceptor interceptor = new TransactionInterceptor();
         interceptor.setTransactionAttributeSource(transactionAttributeSource());
         if (this.txManager != null) {
             interceptor.setTransactionManager(this.txManager);
         }
         return interceptor;
     }
 
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

核心方法:transactionAdvisor() 事务织入

定义了一个 advisor,设置事务属性、设置事务拦截器 TransactionInterceptor、设置顺序。核心就是事务拦截器 TransactionInterceptor

TransactionInterceptor 使用通用的 spring 事务基础架构实现“声明式事务”,继承自 TransactionAspectSupport 类(该类包含与Spring的底层事务API的集成),实现了 MethodInterceptor 接口。spring 类图如下
在这里插入图片描述
事务拦截器的拦截功能就是依靠实现了 MethodInterceptor 接口,熟悉 spring 的同学肯定很熟悉 MethodInterceptor 了,这个是 spring 的方法拦截器,主要看 invoke 方法:

 @Override
     public Object invoke(final MethodInvocation invocation) throws Throwable {
         // Work out the target class: may be {@code null}.
         // The TransactionAttributeSource should be passed the target class
         // as well as the method, which may be from an interface.
         Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
 
         // 调用TransactionAspectSupport的 invokeWithinTransaction方法
         return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
             @Override
             public Object proceedWithInvocation() throws Throwable {
                 return invocation.proceed();
             }
         });
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

如上图 TransactionInterceptor 复写 MethodInterceptor 接口的 invoke 方法,并在 invoke 方法中调用了父类 TransactionAspectSupport 的 invokeWithinTransaction() 方法,源码如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
             throws Throwable {  
 
         // 如果transaction attribute为空,该方法就是非事务(非编程式事务)
         final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
         final PlatformTransactionManager tm = determineTransactionManager(txAttr);
         final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
      // 标准声明式事务:如果事务属性为空 或者 非回调偏向的事务管理器
         if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
             // Standard transaction demarcation with getTransaction and commit/rollback calls.
             TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
             Object retVal = null;
             try {
                 // 这里就是一个环绕增强,在这个proceed前后可以自己定义增强实现
                 // 方法执行
                 retVal = invocation.proceedWithInvocation();
             }
             catch (Throwable ex) {
                 // 根据事务定义的,该异常需要回滚就回滚,否则提交事务
                 completeTransactionAfterThrowing(txInfo, ex);
                 throw ex;
             }
             finally {//清空当前事务信息,重置为老的
                 cleanupTransactionInfo(txInfo);
             }//返回结果之前提交事务
             commitTransactionAfterReturning(txInfo);
             return retVal;
         }
      // 编程式事务:(回调偏向)
         else {
             final ThrowableHolder throwableHolder = new ThrowableHolder();
 
             // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
             try {
                 Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                         new TransactionCallback<Object>() {
                             @Override
                             public Object doInTransaction(TransactionStatus status) {
                                 TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                 try {
                                     return invocation.proceedWithInvocation();
                                 }
                                 catch (Throwable ex) {// 如果该异常需要回滚
                                     if (txAttr.rollbackOn(ex)) {
                                         // 如果是运行时异常返回
                                         if (ex instanceof RuntimeException) {
                                             throw (RuntimeException) ex;
                                         }// 如果是其它异常都抛ThrowableHolderException
                                         else {
                                             throw new ThrowableHolderException(ex);
                                         }
                                     }// 如果不需要回滚
                                     else {
                                         // 定义异常,最终就直接提交事务了
                                         throwableHolder.throwable = ex;
                                         return null;
                                     }
                                 }
                                 finally {//清空当前事务信息,重置为老的
                                     cleanupTransactionInfo(txInfo);
                                 }
                             }
                         });
 
                 // 上抛异常
                 if (throwableHolder.throwable != null) {
                     throw throwableHolder.throwable;
                 }
                 return result;
             }
             catch (ThrowableHolderException ex) {
                 throw ex.getCause();
             }
             catch (TransactionSystemException ex2) {
                 if (throwableHolder.throwable != null) {
                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                     ex2.initApplicationException(throwableHolder.throwable);
                 }
                 throw ex2;
             }
             catch (Throwable ex2) {
                 if (throwableHolder.throwable != null) {
                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                 }
                 throw ex2;
             }
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

如上图,我们主要看第一个分支,申明式事务,核心流程如下:

  1. createTransactionIfNecessary():如果有必要,创建事务
  2. InvocationCallback的proceedWithInvocation():InvocationCallback 是父类的内部回调接口,子类中实现该接口供父类调用,子类 TransactionInterceptor 中 invocation.proceed()。回调方法执行
  3. 异常回滚 completeTransactionAfterThrowing()
  • createTransactionIfNecessary():
 protected TransactionInfo createTransactionIfNecessary(
             PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
 
         // 如果还没有定义名字,把连接点的ID定义成事务的名称
         if (txAttr != null && txAttr.getName() == null) {
             txAttr = new DelegatingTransactionAttribute(txAttr) {
                 @Override
                 public String getName() {
                     return joinpointIdentification;
                 }
             };
         }
 
         TransactionStatus status = null;
         if (txAttr != null) {
             if (tm != null) {
                 status = tm.getTransaction(txAttr);
             }
             else {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                             "] because no transaction manager has been configured");
                 }
             }
         }
         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

核心就是:
(1):getTransaction(),根据事务属性获取事务 TransactionStatus,大道归一,都是调用 PlatformTransactionManager.getTransaction()
(2):prepareTransactionInfo(),构造一个 TransactionInfo 事务信息对象,绑定当前线程:ThreadLocal

  • invocation.proceed()回调业务方法:
    最终实现类是 ReflectiveMethodInvocation,类图如下:
    在这里插入图片描述
    如上图,ReflectiveMethodInvocation 类实现了 ProxyMethodInvocation 接口,但是 ProxyMethodInvocation 继承了3层接口…ProxyMethodInvocation->MethodInvocation->Invocation->Joinpoint

Joinpoint:连接点接口,定义了执行接口:Object proceed() throws Throwable; 执行当前连接点,并跳到拦截器链上的下一个拦截器
Invocation:调用接口,继承自 Joinpoint,定义了获取参数接口: Object[] getArguments();是一个带参数的、可被拦截器拦截的连接点
MethodInvocation:方法调用接口,继承自 Invocation,定义了获取方法接口:Method getMethod(); 是一个带参数的可被拦截的连接点方法
ProxyMethodInvocation:代理方法调用接口,继承自 MethodInvocation,定义了获取代理对象接口:Object getProxy();是一个由代理类执行的方法调用连接点方法
ReflectiveMethodInvocation:实现了 ProxyMethodInvocation 接口,自然就实现了父类接口的的所有接口。获取代理类,获取方法,获取参数,用代理类执行这个方法并且自动跳到下一个连接点

下面看一下 proceed 方法源码:

 @Override
     public Object proceed() throws Throwable {
         //    启动时索引为-1,唤醒连接点,后续递增
         if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
             return invokeJoinpoint();
         }
 
         Object interceptorOrInterceptionAdvice =
                 this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
         if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
             // 这里进行动态方法匹配校验,静态的方法匹配早已经校验过了(MethodMatcher接口有两种典型:动态/静态校验)
             InterceptorAndDynamicMethodMatcher dm =
                     (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
             if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
                 return dm.interceptor.invoke(this);
             }
             else {
                 // 动态匹配失败,跳过当前拦截,进入下一个(拦截器链)
                 return proceed();
             }
         }
         else {
             // 它是一个拦截器,所以我们只调用它:在构造这个对象之前,切入点将被静态地计算。
             return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

咱们这里最终调用的是 ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); 就是 TransactionInterceptor 事务拦截器回调 目标业务方法(addUserBalanceAndUser)

  • completeTransactionAfterThrowing()
    最终调用 AbstractPlatformTransactionManager 的 rollback(),提交事务 commitTransactionAfterReturning() 最终调用 AbstractPlatformTransactionManager的commit()

  • 总结:
    可见不管是编程式事务,还是声明式事务,最终源码都是调用事务管理器的 PlatformTransactionManager 接口的3个方法:

    1. getTransaction
    2. commit
    3. rollback
4.3 事务核心源码

在这里插入图片描述如上提所示,PlatformTransactionManager 顶级接口定义了最核心的事务管理方法,下面一层是 AbstractPlatformTransactionManager 抽象类,实现了 PlatformTransactionManager 接口的方法并定义了一些抽象方法,供子类拓展。最后下面一层是2个经典事务管理器:

  1. DataSourceTransactionmanager,即 JDBC 单数据库事务管理器,基于 Connection 实现
  2. JtaTransactionManager,即多数据库事务管理器(又叫做分布式事务管理器),其实现了 JTA 规范,使用 XA 协议进行两阶段提交

我们这里只看基于 JDBC connection 的 DataSourceTransactionmanager 源码

PlatformTransactionManager接口:

public interface PlatformTransactionManager {
    // 获取事务状态
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
  // 事务提交
    void commit(TransactionStatus status) throws TransactionException;
  // 事务回滚
    void rollback(TransactionStatus status) throws TransactionException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
4.3.1 getTransaction获取事务

在这里插入图片描述
AbstractPlatformTransactionManager 实现了 getTransaction() 方法如下:

     @Override
     public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
         Object transaction = doGetTransaction();
 
         // Cache debug flag to avoid repeated checks.
         boolean debugEnabled = logger.isDebugEnabled();
 
         if (definition == null) {
             // Use defaults if no transaction definition given.
             definition = new DefaultTransactionDefinition();
         }
       // 如果当前已经存在事务
         if (isExistingTransaction(transaction)) {
             // 根据不同传播机制不同处理
             return handleExistingTransaction(definition, transaction, debugEnabled);
         }
 
         // 超时不能小于默认值
         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
         }
 
         // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
             throw new IllegalTransactionStateException(
                     "No existing transaction found for transaction marked with propagation 'mandatory'");
         }// 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
             SuspendedResourcesHolder suspendedResources = suspend(null);
             if (debugEnabled) {
                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
             }
             try {// 是否需要新开启同步// 开启// 开启
                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                 DefaultTransactionStatus status = newTransactionStatus(
                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                 doBegin(transaction, definition);// 开启新事务
                 prepareSynchronization(status, definition);//预备同步
                 return status;
             }
             catch (RuntimeException ex) {
                 resume(null, suspendedResources);
                 throw ex;
             }
             catch (Error err) {
                 resume(null, suspendedResources);
                 throw err;
             }
         }
         else {
             // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,创建“空”事务:没有实际事务,但可能是同步。警告:定义了隔离级别,但并没有真实的事务初始化,隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                         "isolation level will effectively be ignored: " + definition);
             }
             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
             return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

如上图,源码分成了2条处理线:

  1. 当前已存在事务:isExistingTransaction() 判断是否存在事务,存在事务 handleExistingTransaction() 根据不同传播机制不同处理
  2. 当前不存在事务: 不同传播机制不同处理

handleExistingTransaction() 源码如下:

     private TransactionStatus handleExistingTransaction(
             TransactionDefinition definition, Object transaction, boolean debugEnabled)
             throws TransactionException {
      // 1.NERVER(不支持当前事务;如果当前事务存在,抛出异常)报错
         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
             throw new IllegalTransactionStateException(
                     "Existing transaction found for transaction marked with propagation 'never'");
         }
       // 2.NOT_SUPPORTED(不支持当前事务,现有同步将被挂起)挂起当前事务
         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
             if (debugEnabled) {
                 logger.debug("Suspending current transaction");
             }
             Object suspendedResources = suspend(transaction);
             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
             return prepareTransactionStatus(
                     definition, null, false, newSynchronization, debugEnabled, suspendedResources);
         }
      // 3.REQUIRES_NEW挂起当前事务,创建新事务
         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
             if (debugEnabled) {
                 logger.debug("Suspending current transaction, creating new transaction with name [" +
                         definition.getName() + "]");
             }// 挂起当前事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
             try {// 创建新事务
                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                 DefaultTransactionStatus status = newTransactionStatus(
                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                 doBegin(transaction, definition);
                 prepareSynchronization(status, definition);
                 return status;
             }
             catch (RuntimeException beginEx) {
                 resumeAfterBeginException(transaction, suspendedResources, beginEx);
                 throw beginEx;
             }
             catch (Error beginErr) {
                 resumeAfterBeginException(transaction, suspendedResources, beginErr);
                 throw beginErr;
             }
         }
      // 4.NESTED嵌套事务
         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
             if (!isNestedTransactionAllowed()) {
                 throw new NestedTransactionNotSupportedException(
                         "Transaction manager does not allow nested transactions by default - " +
                         "specify 'nestedTransactionAllowed' property with value 'true'");
             }
             if (debugEnabled) {
                 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
             }// 是否支持保存点:非JTA事务走这个分支。AbstractPlatformTransactionManager默认是true,JtaTransactionManager复写了该方法false,DataSourceTransactionmanager没有复写,还是true,
             if (useSavepointForNestedTransaction()) { 
                 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                 DefaultTransactionStatus status =
                         prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                 status.createAndHoldSavepoint();// 创建保存点
                 return status;
             }
             else {
                 // JTA事务走这个分支,创建新事务
                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                 DefaultTransactionStatus status = newTransactionStatus(
                         definition, transaction, true, newSynchronization, debugEnabled, null);
                 doBegin(transaction, definition);
                 prepareSynchronization(status, definition);
                 return status;
             }
         }
 
         
         if (debugEnabled) {
             logger.debug("Participating in existing transaction");
         }
         if (isValidateExistingTransaction()) {
             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                 Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                     Constants isoConstants = DefaultTransactionDefinition.constants;
                     throw new IllegalTransactionStateException("Participating transaction with definition [" +
                             definition + "] specifies isolation level which is incompatible with existing transaction: " +
                             (currentIsolationLevel != null ?
                                     isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                     "(unknown)"));
                 }
             }
             if (!definition.isReadOnly()) {
                 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                     throw new IllegalTransactionStateException("Participating transaction with definition [" +
                             definition + "] is not marked as read-only but existing transaction is");
                 }
             }
         }// 到这里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,存在事务加入事务即可,prepareTransactionStatus第三个参数就是是否需要新事务。false代表不需要新事物
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
         return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

如上图,当前线程已存在事务情况下,新的不同隔离级别处理情况:

  1. NERVER:不支持当前事务;如果当前事务存在,抛出异常:“Existing transaction found for transaction marked with propagation ‘never’”
  2. NOT_SUPPORTED:不支持当前事务,现有同步将被挂起:suspend()
  3. REQUIRES_NEW挂起当前事务,创建新事务:
    1. suspend()
    2. doBegin()
  4. NESTED嵌套事务
    1. 非JTA事务:createAndHoldSavepoint()创建JDBC3.0保存点,不需要同步
    2. JTA事务:开启新事务,doBegin()+prepareSynchronization()需要同步

这里有几个核心方法:挂起当前事务 suspend()、开启新事务 doBegin()

suspend() 源码如下:

 protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
         if (TransactionSynchronizationManager.isSynchronizationActive()) {// 1.当前存在同步,
             List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
             try {
                 Object suspendedResources = null;
                 if (transaction != null) {// 事务不为空,挂起事务
                     suspendedResources = doSuspend(transaction);
                 }// 解除绑定当前事务各种属性:名称、只读、隔离级别、是否是真实的事务.
                 String name = TransactionSynchronizationManager.getCurrentTransactionName();
                 TransactionSynchronizationManager.setCurrentTransactionName(null);
                 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                 TransactionSynchronizationManager.setActualTransactionActive(false);
                 return new SuspendedResourcesHolder(
                         suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
             }
             catch (RuntimeException ex) {
                 // doSuspend failed - original transaction is still active...
                 doResumeSynchronization(suspendedSynchronizations);
                 throw ex;
             }
             catch (Error err) {
                 // doSuspend failed - original transaction is still active...
                 doResumeSynchronization(suspendedSynchronizations);
                 throw err;
             }
         }// 2.没有同步但,事务不为空,挂起事务
         else if (transaction != null) {
             // Transaction active but no synchronization active.
             Object suspendedResources = doSuspend(transaction);
             return new SuspendedResourcesHolder(suspendedResources);
         }// 2.没有同步但,事务为空,什么都不用做
         else {
             // Neither transaction nor synchronization active.
             return null;
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

doSuspend() 挂起事务,AbstractPlatformTransactionManager 抽象类 doSuspend() 会报错:不支持挂起,如果具体事务执行器支持就复写 doSuspend(),DataSourceTransactionManager 实现如下:

     @Override
     protected Object doSuspend(Object transaction) {
         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
         txObject.setConnectionHolder(null);
         return TransactionSynchronizationManager.unbindResource(this.dataSource);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

挂起 DataSourceTransactionManager 事务的核心操作就是:

  1. 把当前事务的 connectionHolder 数据库连接持有者清空
  2. 当前线程解绑 datasource。其实就是 ThreadLocal 移除对应变量(TransactionSynchronizationManager 类中定义的 private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>(“Transactional resources”);)

TransactionSynchronizationManager 事务同步管理器,该类维护了多个线程本地变量 ThreadLocal,如下图:

 public abstract class TransactionSynchronizationManager {
 
     private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
     // 事务资源:map<k,v> 两种数据对。1.会话工厂和会话k=SqlsessionFactory v=SqlSessionHolder 2.数据源和连接k=DataSource v=ConnectionHolder
     private static final ThreadLocal<Map<Object, Object>> resources =
             new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
     // 事务同步
     private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
             new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
   // 当前事务名称
     private static final ThreadLocal<String> currentTransactionName =
             new NamedThreadLocal<String>("Current transaction name");
   // 当前事务的只读属性
     private static final ThreadLocal<Boolean> currentTransactionReadOnly =
             new NamedThreadLocal<Boolean>("Current transaction read-only status");
   // 当前事务的隔离级别
     private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
             new NamedThreadLocal<Integer>("Current transaction isolation level");
   // 是否存在事务
     private static final ThreadLocal<Boolean> actualTransactionActive =
             new NamedThreadLocal<Boolean>("Actual transaction active");
 。。。
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

doBegin()源码如下:

     @Override
     protected void doBegin(Object transaction, TransactionDefinition definition) {
         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
         Connection con = null;
 
         try {// 如果事务还没有connection或者connection在事务同步状态,重置新的connectionHolder
             if (!txObject.hasConnectionHolder() ||
                     txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                 Connection newCon = this.dataSource.getConnection();
                 if (logger.isDebugEnabled()) {
                     logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                 }// 重置新的connectionHolder
                 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
             }
       //设置新的连接为事务同步中
             txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
             con = txObject.getConnectionHolder().getConnection();
          //conn设置事务隔离级别,只读
             Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
             txObject.setPreviousIsolationLevel(previousIsolationLevel);//DataSourceTransactionObject设置事务隔离级别
 
             // 如果是自动提交切换到手动提交
             // so we don't want to do it unnecessarily (for example if we've explicitly
             // configured the connection pool to set it already).
             if (con.getAutoCommit()) {
                 txObject.setMustRestoreAutoCommit(true);
                 if (logger.isDebugEnabled()) {
                     logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                 }
                 con.setAutoCommit(false);
             }
       // 如果只读,执行sql设置事务只读
             prepareTransactionalConnection(con, definition);
             txObject.getConnectionHolder().setTransactionActive(true);// 设置connection持有者的事务开启状态
 
             int timeout = determineTimeout(definition);
             if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);// 设置超时秒数
             }
 
             // 绑定connection持有者到当前线程
             if (txObject.isNewConnectionHolder()) {
                 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
             }
         }
 
         catch (Throwable ex) {
             if (txObject.isNewConnectionHolder()) {
                 DataSourceUtils.releaseConnection(con, this.dataSource);
                 txObject.setConnectionHolder(null, false);
             }
             throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

如上图,开启新事务的准备工作 doBegin() 的核心操作就是:

  1. DataSourceTransactionObject “数据源事务对象”,设置ConnectionHolder,再给 ConnectionHolder 设置各种属性:自动提交、超时、事务开启、隔离级别
  2. 给当前线程绑定一个线程本地变量,key=DataSource 数据源 ,v=ConnectionHolder 数据库连接
4.3.2 commit提交事务

讲解源码之前先看一下资源管理类:

SqlSessionSynchronization 是 SqlSessionUtils 的一个内部类,继承自 TransactionSynchronizationAdapter 抽象类,实现了事务同步接口TransactionSynchronization
类图如下:
在这里插入图片描述
TransactionSynchronization 接口定义了事务操作时的对应资源的(JDBC事务那么就是SqlSessionSynchronization)管理方法:

     // 挂起事务   
   void suspend();
     
     // 唤醒事务
     void resume();
     
     void flush();
 
     // 提交事务前
     void beforeCommit(boolean readOnly);
 
     // 提交事务完成前
     void beforeCompletion();
 
     // 提交事务后
     void afterCommit();
 
     // 提交事务完成后
     void afterCompletion(int status);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

后续很多都是使用这些接口管理事务

commit提交事务
在这里插入图片描述
AbstractPlatformTransactionManager 的 commit 源码如下:

     @Override
     public final void commit(TransactionStatus status) throws TransactionException {
         if (status.isCompleted()) {// 如果事务已完结,报错无法再次提交
             throw new IllegalTransactionStateException(
                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
         }
 
         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
         if (defStatus.isLocalRollbackOnly()) {// 如果事务明确标记为回滚,
             if (defStatus.isDebug()) {
                 logger.debug("Transactional code has requested rollback");
             }
             processRollback(defStatus);//执行回滚
             return;
         }//如果不需要全局回滚时提交 且 全局回滚
         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
             if (defStatus.isDebug()) {
                 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
             }//执行回滚
             processRollback(defStatus);
             // 仅在最外层事务边界(新事务)或显式地请求时抛出“未期望的回滚异常”
             if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                 throw new UnexpectedRollbackException(
                         "Transaction rolled back because it has been marked as rollback-only");
             }
             return;
         }
      // 执行提交事务
         processCommit(defStatus);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

如上图,各种判断:

  1. 如果事务明确标记为本地回滚,执行回滚
  2. 如果不需要全局回滚时提交且全局回滚,执行回滚
  3. 提交事务,核心方法processCommit()

processCommit 如下:

 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
         try {
             boolean beforeCompletionInvoked = false;
             try {//3个前置操作
                 prepareForCommit(status);
                 triggerBeforeCommit(status);
                 triggerBeforeCompletion(status);
                 beforeCompletionInvoked = true;//3个前置操作已调用
                 boolean globalRollbackOnly = false;//新事务 或 全局回滚失败
                 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                     globalRollbackOnly = status.isGlobalRollbackOnly();
                 }//1.有保存点,即嵌套事务
                 if (status.hasSavepoint()) {
                     if (status.isDebug()) {
                         logger.debug("Releasing transaction savepoint");
                     }//释放保存点
                     status.releaseHeldSavepoint();
                 }//2.新事务
                 else if (status.isNewTransaction()) {
                     if (status.isDebug()) {
                         logger.debug("Initiating transaction commit");
                     }//调用事务处理器提交事务
                     doCommit(status);
                 }
                 // 3.非新事务,且全局回滚失败,但是提交时没有得到异常,抛出异常
                 if (globalRollbackOnly) {
                     throw new UnexpectedRollbackException(
                             "Transaction silently rolled back because it has been marked as rollback-only");
                 }
             }
             catch (UnexpectedRollbackException ex) {
                 // 触发完成后事务同步,状态为回滚
                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                 throw ex;
             }// 事务异常
             catch (TransactionException ex) {
                 // 提交失败回滚
                 if (isRollbackOnCommitFailure()) {
                     doRollbackOnCommitException(status, ex);
                 }// 触发完成后回调,事务同步状态为未知
                 else {
                     triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                 }
                 throw ex;
             }// 运行时异常
             catch (RuntimeException ex) {
            // 如果3个前置步骤未完成,调用前置的最后一步操作
                 if (!beforeCompletionInvoked) {
                     triggerBeforeCompletion(status);
                 }// 提交异常回滚
                 doRollbackOnCommitException(status, ex);
                 throw ex;
             }// 其它异常
             catch (Error err) {  
            // 如果3个前置步骤未完成,调用前置的最后一步操作
                 if (!beforeCompletionInvoked) {
                     triggerBeforeCompletion(status);
                 }// 提交异常回滚
                 doRollbackOnCommitException(status, err);
                 throw err;
             }
 
             // Trigger afterCommit callbacks, with an exception thrown there
             // propagated to callers but the transaction still considered as committed.
             try {
                 triggerAfterCommit(status);
             }
             finally {
                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
             }
 
         }
         finally {
             cleanupAfterCompletion(status);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

如上图,commit 事务时,有6个核心操作,分别是3个前置操作,3个后置操作,如下:

  1. prepareForCommit(status):源码是空的,没有拓展目前

  2. triggerBeforeCommit(status):提交前触发操作

    protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
        if (status.isNewSynchronization()) {
            if (status.isDebug()) {
                logger.trace("Triggering beforeCommit synchronization");
            }
            TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

triggerBeforeCommit 源码如下:

     public static void triggerBeforeCommit(boolean readOnly) {
         for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
             synchronization.beforeCommit(readOnly);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5

如上图,TransactionSynchronizationManager 类定义了多个 ThreadLocal(线程本地变量),其中一个用以保存当前线程的事务同步:

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
  • 1

遍历事务同步器,把每个事务同步器都执行“提交前”操作,比如咱们用的 jdbc 事务,那么最终就是 SqlSessionUtils.beforeCommit()->this.holder.getSqlSession().commit(); 提交会话。(源码由于是 spring 管理实务,最终不会执行事务提交,例如是 DefaultSqlSession:执行清除缓存、重置状态操作)

  1. triggerBeforeCompletion(status):完成前触发操作,如果是 jdbc 事务,那么最终就是,

SqlSessionUtils.beforeCompletion -> TransactionSynchronizationManager.unbindResource(sessionFactory); 解绑当前线程的会话工厂

this.holder.getSqlSession().close();关闭会话。(源码由于是spring管理实务,最终不会执行事务close操作,例如是DefaultSqlSession,也会执行各种清除收尾操作)

  1. triggerAfterCommit(status):提交事务后触发操作
    TransactionSynchronizationUtils.triggerAfterCommit() -> TransactionSynchronizationUtils.invokeAfterCommit,如下:
 public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
         if (synchronizations != null) {
             for (TransactionSynchronization synchronization : synchronizations) {
                 synchronization.afterCommit();
             }
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

好吧,一顿找,最后在 TransactionSynchronizationAdapter 中复写过,并且是空的…SqlSessionSynchronization 继承了 TransactionSynchronizationAdapter 但是没有复写这个方法

  1. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);

TransactionSynchronizationUtils.TransactionSynchronizationUtils.invokeAfterCompletion,如下:

     public static void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
         if (synchronizations != null) {
             for (TransactionSynchronization synchronization : synchronizations) {
                 try {
                     synchronization.afterCompletion(completionStatus);
                 }
                 catch (Throwable tsex) {
                     logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
                 }
             }
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

afterCompletion:对于 JDBC 事务来说,最终:
1. 如果会话任然活着,关闭会话
2. 重置各种属性:SQL 会话同步器(SqlSessionSynchronization)的 SQL 会话持有者(SqlSessionHolder)的referenceCount引用计数、synchronizedWithTransaction同步事务、rollbackOnly只回滚、deadline 超时时间点

  1. cleanupAfterCompletion(status);
    1. 设置事务状态为已完成
    2. 如果是新的事务同步,解绑当前线程绑定的数据库资源,重置数据库连接
    3. 如果存在挂起的事务(嵌套事务),唤醒挂起的老事务的各种资源:数据库资源、同步器
     private void cleanupAfterCompletion(DefaultTransactionStatus status) {
         status.setCompleted();//设置事务状态完成
     //如果是新的同步,清空当前线程绑定的除了资源外的全部线程本地变量:包括事务同步器、事务名称、只读属性、隔离级别、真实的事务激活状态
         if (status.isNewSynchronization()) {
             TransactionSynchronizationManager.clear();
         }//如果是新的事务同步
         if (status.isNewTransaction()) {
             doCleanupAfterCompletion(status.getTransaction());
         }//如果存在挂起的资源
         if (status.getSuspendedResources() != null) {
             if (status.isDebug()) {
                 logger.debug("Resuming suspended transaction after completion of inner transaction");
             }//唤醒挂起的事务和资源(重新绑定之前挂起的数据库资源,唤醒同步器,注册同步器到TransactionSynchronizationManager)
             resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

对于DataSourceTransactionManager,doCleanupAfterCompletion源码如下:

     protected void doCleanupAfterCompletion(Object transaction) {
         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 
         // 如果是最新的连接持有者,解绑当前线程绑定的<数据库资源,ConnectionHolder>
         if (txObject.isNewConnectionHolder()) {
             TransactionSynchronizationManager.unbindResource(this.dataSource);
         }
 
         // 重置数据库连接(隔离级别、只读)
         Connection con = txObject.getConnectionHolder().getConnection();
         try {
             if (txObject.isMustRestoreAutoCommit()) {
                 con.setAutoCommit(true);
             }
             DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
         }
         catch (Throwable ex) {
             logger.debug("Could not reset JDBC Connection after transaction", ex);
         }
 
         if (txObject.isNewConnectionHolder()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
             }// 资源引用计数-1,关闭数据库连接
             DataSourceUtils.releaseConnection(con, this.dataSource);
         }
         // 重置连接持有者的全部属性
         txObject.getConnectionHolder().clear();
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
4.3.3 rollback回滚事务

在这里插入图片描述
AbstractPlatformTransactionManager 中 rollback 源码如下:

     public final void rollback(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
         processRollback(defStatus);
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

processRollback 源码如下:

     private void processRollback(DefaultTransactionStatus status) {
         try {
             try {// 解绑当前线程绑定的会话工厂,并关闭会话
                 triggerBeforeCompletion(status);
                 if (status.hasSavepoint()) {// 1.如果有保存点,即嵌套式事务
                     if (status.isDebug()) {
                         logger.debug("Rolling back transaction to savepoint");
                     }//回滚到保存点
                     status.rollbackToHeldSavepoint();
                 }//2.如果就是一个简单事务
                 else if (status.isNewTransaction()) {
                     if (status.isDebug()) {
                         logger.debug("Initiating transaction rollback");
                     }//回滚核心方法
                     doRollback(status);
                 }//3.当前存在事务且没有保存点,即加入当前事务的
                 else if (status.hasTransaction()) {//如果已经标记为回滚 或 当加入事务失败时全局回滚(默认true)
                     if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                         if (status.isDebug()) {//debug时会打印:加入事务失败-标记已存在事务为回滚
                             logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                         }//设置当前connectionHolder:当加入一个已存在事务时回滚
                         doSetRollbackOnly(status);
                     }
                     else {
                         if (status.isDebug()) {
                             logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                         }
                     }
                 }
                 else {
                     logger.debug("Should roll back transaction but cannot - no transaction available");
                 }
             }
             catch (RuntimeException ex) {//关闭会话,重置SqlSessionHolder属性
                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                 throw ex;
             }
             catch (Error err) {
                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                 throw err;
             }
             triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
         }
         finally {
             // 解绑当前线程
             cleanupAfterCompletion(status);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

如上图,有几个公共方法和提交事务时一致,就不再重复

这里主要看 doRollback,DataSourceTransactionManager 的 doRollback() 源码如下:

     protected void doRollback(DefaultTransactionStatus status) {
         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
         Connection con = txObject.getConnectionHolder().getConnection();
         if (status.isDebug()) {
             logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
         }
         try {
             con.rollback();
         }
         catch (SQLException ex) {
             throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
         }
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

好吧,一点不复杂,就是 Connection 的 rollback

4.3.4 时序图

特地整理了时序图(简单的新事务,没有画出保存点等情况)如下:
在这里插入图片描述

5 测试验证
5.1 环境
5.1.1 环境业务模拟

模拟用户去银行转账,用户A 转账给 用户B,需要保证 用户A 扣款,用户B 加款同时成功或失败回滚

5.1.2 环境准备

测试环境:mysql8+mac,测试时使用的mysql8(和mysql5.6的设置事务变量的语句不同,不用太在意)

测试准备:创建一个数据库test,创建一张表user_balance用户余额表。id主键,name姓名,balance账户余额

 mysql> create database test;
 Query OK, 1 row affected (0.05 sec)
 
 mysql> use test;
 Database changed
 mysql> CREATE TABLE `user_balance` (
     ->   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID主键',
     ->   `name` varchar(20) DEFAULT NULL COMMENT '姓名',
     ->   `balance` decimal(10,0) DEFAULT NULL COMMENT '账户余额',
     ->   PRIMARY KEY (`id`)
     -> ) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8;
 Query OK, 0 rows affected, 1 warning (0.15 sec)
 
// 初始化数据,2个账户都是1000元:
mysql> INSERT INTO `user_balance` VALUES ('1', '张三', '1000'), ('2', '李四', '1000');
Query OK, 2 rows affected (0.06 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> select * from user_balance;                                              
+----+--------+---------+
| id | name   | balance |
+----+--------+---------+
|  1 | 张三   |    1000 |
|  2 | 李四   |    1000 |
+----+--------+---------+
2 rows in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5.2 隔离级别实测
5.2.1 隔离级别实测

通用语句:

  1. 开启/提交事务:开启:begin/start transaction都行,提交:commit;
  2. 查询事务级别:select @@transaction_isolation;
  3. 修改事务级别:set global transaction_isolation=‘read-uncommitted’;

注意:修改完了后要 exit 退出再重新连接 mysql(mysql -uroot) 才能生效(这里是模拟MySQL5.6,MySQL8有直接生效的语句)

以下4种测试都是先设置好事务隔离级别,再做的测试,下面的测试就不再展示出来了

5.2.2 Read Uncommitted(读未提交)

测试步骤:

  1. 开启 2 个会话连接 mysql,会话 1 开始事务 A,会话 2 开始事务 B
  2. 事务 A 中执行 update 把张三的余额 1000-100=900,事务A查询结果为 900
  3. 此时事务 A 并没有提交,事务 B 查询结果也是 900,即:读取了未提交的内容(MVCC快照读的最新版本号数据)

如下图(左边的是会话1-事务A,右边的是会话2-事务B)
在这里插入图片描述
总结:明显不行,因为事务 A 内部的处理数据不一定是最后的数据,很可能事务 A 后续再加上 1000,那么事务 B 读取的数据明显就错了,即脏读!

5.2.3 Read Committed(读提交)

测试步骤:

  1. 开启 2 个会话连接 mysql,会话 1 开始事务 A,会话 2 开始事务 B
  2. 事务 A 中执行 update 把张三的余额 1000-100=900,事务 A 查询结果为 900。只要事务 A 未提交,事务 B 查询数据都没有变化还是 1000
  3. 事务 A 提交,事务 B 查询立即变成 900 了,即:读已提交

如下图(左边的是会话1-事务A,右边的是会话2-事务B):
在这里插入图片描述
总结:解决了脏读问题,但此时事务 B 还没提交,即出现了在一个事务中多次查询同一 sql 数据不一致的情况,即不可重复读!

5.2.4 Repeatable Read(可重读)

测试步骤:

  1. 开启 2 个会话连接 mysql,会话 1 开始事务 A,会话 2 开始事务 B
  2. 事务 A 中执行 update 把张三的余额 1000-100=900,事务 A 查询结果为 900。事务 A 提交,事务 B 查询数据还是 1000 不变
  3. 会话 1 再开始一个事务 C 插入一条“王五”数据,并提交,事务 B 查询还是 2 条数据,且数据和第一次查询一致,即:读已提交+可重复读
  4. 会话 2 中的事务 B 也插入一条相同 ID 的数据,报错:已经存在相同 ID=3 的数据插入失败!即出现了幻读

如下图:
在这里插入图片描述
mysql 支持的解决方案:

要防止幻读,可以事务 A 中 for update 加上范围,最终会生成间隙锁,阻塞其它事务插入数据,并且当事务 A 提交后,事务 B 立即可以插入成功
在这里插入图片描述

5.2.5 Serializable(可串行化)

测试步骤:

  1. 开启 2 个会话连接 mysql,会话 1开始事务 A,会话 2 开始事务 B
  2. 事务 A,查询 id=2 的记录,事务 B 更新 id=2 的记录,update 操作被阻塞一直到超时(事务A提交后,事务B update可以立即执行)

如下图(左边的是会话1-事务A,右边的是会话2-事务B):
在这里插入图片描述
结论:Serializable 级别下,读也加锁!如果是行锁(查询一行),那么后续对这一行的修改操作会直接阻塞等待第一个事务完毕。如果是表锁(查询整张表),那么后续对这张表的所有修改操作都阻塞等待。可见仅仅一个查询就锁住了相应的查询数据,性能实在是不敢恭维

5.3 传播机制实测
5.3.1 测试准备

环境:spring4+mybatis+mysql+slf4j+logback,注意:日志logback要配置:日志打印为debug级别,这样才能看见事务过程
测试代码:

  // 测试基类:BaseTest
  @Slf4j
  @RunWith(SpringRunner.class)
  @SpringBootTest(classes = StudyDemoApplication.class)
  public class BaseTest {

  }

  // 测试子类:UserBalanceTest
  /**
   * @Description 用户余额测试类(事务)
   * @author denny
   * @date 2018/9/4 上午11:38
   */ 
  public class UserBalanceTest extends BaseTest{
 
     @Resource
     private UserBalanceService userBalanceService;
 
     @Test
     public void testAddUserBalanceAndUser(){
         userBalanceService.addUserBalanceAndUser("赵六",new BigDecimal(1000));
     }
     
     public static void main(String[] args) {
         
     }
     
  }

  // UserBalanceImpl
  @Slf4j
  @Service
  public class UserBalanceImpl implements UserBalanceService {
 
      @Resource
     private UserService userService;
     @Resource
     private UserBalanceRepository userBalanceRepository;
 
     /**
      * 创建用户
      *
      * @param userBalance
      * @return
      */
     @Override
     public void addUserBalance(UserBalance userBalance) {
         this.userBalanceRepository.insert(userBalance);
     }
 
     /**
      * 创建用户并创建账户余额
      *
      * @param name
      * @param balance
      * @return
      */
     @Transactional(propagation= Propagation.REQUIRED, rollbackFor = Exception.class)
     @Override
     public void addUserBalanceAndUser(String name, BigDecimal balance) {
         log.info("[addUserBalanceAndUser] begin!!!");
         //1.新增用户
         userService.addUser(name);
         //2.新增用户余额
         UserBalance userBalance = new UserBalance();
         userBalance.setName(name);
         userBalance.setBalance(new BigDecimal(1000));
         this.addUserBalance(userBalance);
         log.info("[addUserBalanceAndUser] end!!!");
     }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

如上图所示:

addUserBalanceAndUser(){
  addUser(name);//添加用户
  addUserBalance(userBalance);//添加用户余额
}
  • 1
  • 2
  • 3
  • 4

addUserBalanceAndUser 开启一个事务,内部方法 addUser 也申明事务,如下:

UserServiceImpl:

  @Slf4j
  @Service
  public class UserServiceImpl implements UserService{
     @Resource
     private UserRepository userRepository;
 
     @Transactional(propagation= Propagation.REQUIRED, rollbackFor = Exception.class)
     @Override
     public void addUser(String name) {
         log.info("[addUser] begin!!!");
         User user = new User();
         user.setName(name);
         userRepository.insert(user);
         log.info("[addUser] end!!!");
     }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
5.3.2 实测
5.3.2.1 REQUIRED

如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务。默认选择

外部方法,内部方法都是 REQUIRED:
在这里插入图片描述
如上图所示:外部方法开启事务,由于不存在事务,Registering 注册一个新事务;内部方法 Fetched 获取已经存在的事务并使用,符合预期

5.3.2.2 SUPPORTS

支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行
外部方法 required,内部 SUPPORTS
在这里插入图片描述
如上图,外部方法创建一个事务,传播机制是 required,内部方法 Participating in existing transaction 即加入已存在的外部事务,并最终一起提交事务,符合预期

5.3.2.3 MANDATORY

支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常

外部没有事务,内部 MANDATORY:
在这里插入图片描述
如上图,外部没有事务,内部 MANDATORY,报错,符合预期

5.3.2.4 REQUIRES_NEW

创建新事务,如果存在当前事务,则挂起当前事务。新事务执行完毕后,再继续执行老事务

外部方法 REQUIRED,内部方法 REQUIRES_NEW
在这里插入图片描述
如上图,外部方法 REQUIRED 创建新事务,内部方法 REQUIRES_NEW 挂起老事务,创建新事务,新事务完毕后,唤醒老事务继续执行。符合预期

5.3.2.5 NOT_SUPPORTED

以非事务方式执行操作,如果当前存在事务,就把当前事务挂起

外部方法 REQUIRED,内部方法 NOT_SUPPORTED
在这里插入图片描述
如上图,外部方法创建事务 A,内部方法不支持事务,挂起事务 A,内部方法执行完毕,唤醒事务 A 继续执行。符合预期

5.3.2.6 NEVER

NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。

外部方法 REQUIRED,内部方法 NEVER:
在这里插入图片描述
如上图,外部方法 REQUIRED 创建事务,内部方法 NEVER 如果当前存在事务报错,符合预期

5.3.2.7 NESTED

如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 REQUIRED 类似的操作

外部方法 REQUIRED,内部方法NESTED
在这里插入图片描述
如上图,外部方法 REQUIRED 创建事务,内部方法 NESTED 构造一个内嵌事务并创建保存点,内部事务运行完毕释放保存点,继续执行外部事务。最终和外部事务一起 commit。上图只有一个 sqlSession 对象,commit 时也是一个。符合预期

注意:NESTED 和 REQUIRES_NEW 区别?

  1. 回滚:NESTED 在创建内层事务之前创建一个保存点,内层事务回滚只回滚到保存点,不会影响外层事务。外层事务回滚则会连着内层事务一起回滚;REQUIRES_NEW 构造一个新事务,和外层事务是两个独立的事务,互不影响
  2. 提交:NESTED 是嵌套事务,是外层事务的子事务。外层事务 commit 则内部事务一起提交,只有一次 commit;REQUIRES_NEW 是新事务,完全独立的事务,独立进行 2 次 commit

强烈注意:
NESTED 嵌套事务能够自己回滚到保存点,但是嵌套事务方法中的上抛的异常,外部方法也能捕获,那么外部事务也就回滚了,所以如果期望实现内部嵌套异常回滚不影响外部事务,那么需要捕获嵌套事务的异常。如下:

 @Transactional(propagation= Propagation.REQUIRED, rollbackFor = Exception.class)
    @Override
    public void addUserBalanceAndUser(String name, BigDecimal balance) {
         log.info("[addUserBalanceAndUser] begin!!!");
         //1.新增用户余额--》最终会插入成功,不受嵌套回滚异常影响
         UserBalance userBalance = new UserBalance();
         userBalance.setName(name);
         userBalance.setBalance(new BigDecimal(1000));
         this.addUserBalance(userBalance);
         //2.新增用户,这里捕获嵌套事务的异常,不让外部事务获取到,不然外部事务肯定会回滚!
         try{
             // 嵌套事务@Transactional(propagation= Propagation.NESTED, rollbackFor = Exception.class)--》异常会回滚到保存点
             userService.addUser(name);
         }catch (Exception e){
             // 这里可根据实际情况添加自己的业务!
             log.error("嵌套事务【addUser】异常!",e);
         }
 
         log.info("[addUserBalanceAndUser] end!!!");
     }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/405438
推荐阅读
相关标签
  

闽ICP备14008679号