赞
踩
参考文章:
前文:
写在开头:本文为个人学习笔记,内容比较随意,夹杂个人理解,如有错误,欢迎指正。
在前文中我们介绍了Spring事务管理是如何开启以及创建代理类对象的,本文我们来深入解析下代理类是如何实现事务的管控。
目录
2、createTransactionIfNecessary
在以前讲AOP的时候我们介绍过在执行代理类的目标方法时,会调用Advisor的getAdvice方法获取MethodInterceptor接口实现类并执行其invoke方法。前一篇文章中我们看到了BeanFactoryTransactionAttributeSourceAdvisor的adviceBeanName便是TransactionInterceptor,该类继承了MethodInterceptor接口。
- RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
- String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
-
-
- RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
- advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
- parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
我们从TransactionInterceptor类的invoke方法开始分析,首先获取被代理类,然后调用invokeWithinTransaction方法,该方法内部将会把被代理类的方法纳入事务中。
- @Override
- public Object invoke(final MethodInvocation invocation) throws Throwable {
- Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
- // 在事务中执行被代理类的方法
- return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
- @Override
- public Object proceedWithInvocation() throws Throwable {
- return invocation.proceed();
- }
- });
- }
invokeWithinTransaction方法首先获取了事务属性和事务管理器,然后针对声明式事务与编程式事务进行了区分处理,我们这里只对声明式事务进行分析。
不同的事务处理方式使用不同的逻辑。对于声明式事务的处理与编程式事务的处理,第一点区别在于事务属性上,因为声明式事务处理是可以没有事务属性的,第二点区别就是在TransactionManager上,CallbackPreferringPlatformTransactionManager实现PlatformTransactionManager接口,暴露出一个方法用于执行事务处理中的回调。所以,这两种方式都可以用作事务处理方式的判断。
- protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
- throws Throwable {
- // 获取事务属性
- final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
- // 获取beanFactory中的transactionManager
- final PlatformTransactionManager tm = determineTransactionManager(txAttr);
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
- // 声明式事务处理
- if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
- ...
- }
- else {
- // 编程式事务处理
- try {
- ...
- }
- catch (ThrowableHolderException ex) {
- throw ex.getCause();
- }
- }
- }
声明式事务的处理过程中,首先会创建事务,即开启事务(这里会返回一个TransactionInfo用来保存事务的数据,便于后续流程中回滚与提交),然后调用被代理类中的方法,方法执行完后根据执行结果判断是回滚还是提交事务。注意无论是回滚还是提交最后都需要将事务信息清空。
- final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
- final PlatformTransactionManager tm = determineTransactionManager(txAttr);
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
- // 声明式事务处理
- if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
- // 创建事务,事务属性等信息会被保存进TransactionInfo中
- TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
- Object retVal;
- try {
- // 执行目标方法
- retVal = invocation.proceedWithInvocation();
- }
- catch (Throwable ex) {
- // 异常处理
- completeTransactionAfterThrowing(txInfo, ex);
- throw ex;
- }
- finally {
- // 清空事务信息
- cleanupTransactionInfo(txInfo);
- }
- // 提交事务
- commitTransactionAfterReturning(txInfo);
- return retVal;
- }
首先调用getTransaction方法,该方法内部将会判断是否需要开启事务并绑定线程与数据库连接,是非常重要的过程,我们下面会做详细介绍。
然后调用prepareTransactionInfo方法根据事务属性与status准备一个TransactionInfo。这是因为当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在TransactionInfo类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring会通过TransactionInfo类型的实例中的信息来进行回滚等后续工作。
- protected TransactionInfo createTransactionIfNecessary(
- PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
- // 如果事务属性没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装txAttr
- if (txAttr != null && txAttr.getName() == null) {
- txAttr = new DelegatingTransactionAttribute(txAttr) {
- @Override
- public String getName() {
- return joinpointIdentification;
- }
- };
- }
- TransactionStatus status = null;
- if (txAttr != null) {
- if (tm != null) {
- // 获取事务状态TransactionStatus
- status = tm.getTransaction(txAttr);
- }
- else {
- if (logger.isDebugEnabled()) {
- logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
- "] because no transaction manager has been configured");
- }
- }
- }
- // 构建事务信息,根据指定的属性与status构建一个TransactionInfo
- return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
- }
首先根据doGetTransaction获取当前事务,然后调用isExistingTransaction判断是否存在事务,如果已经存在则进入handleExistingTransaction方法进行处理。
如果不存在事务属性为REQUIRED、REQUIRES_NEW、NESTED则表明需要开启新事务,将会先将空事务挂起(之所以没有原事务也挂起是为了后续回滚或提交时做判断),然后doBegin开启事务,并调用prepareSynchronization将新事务的信息保存到当前线程中。
如果都不是(例如SUPPORTS)则创建一个DefaultTransactionStatus。
- public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
- // 获取事务
- Object transaction = doGetTransaction();
- boolean debugEnabled = logger.isDebugEnabled();
- if (definition == null) {
- definition = new DefaultTransactionDefinition();
- }
- // 判断当前线程是否已经存在事务
- if (isExistingTransaction(transaction)) {
- // 处理已经存在的事务
- return handleExistingTransaction(definition, transaction, debugEnabled);
- }
- // 事务超时设置验证
- if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
- throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
- }
- // 如果当前线程不存在事务并且事务传播行为为 MANDATORY 报错
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
- throw new IllegalTransactionStateException(
- "No existing transaction found for transaction marked with propagation 'mandatory'");
- }
- // 这三种事务传播行为需要开启新事务
- else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
- definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
- definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- // 挂起原事务,因为这里不存在原事务因此是null
- SuspendedResourcesHolder suspendedResources = suspend(null);
- if (debugEnabled) {
- logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
- }
- try {
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
- // 开启事务
- doBegin(transaction, definition);
- // 同步事务状态
- prepareSynchronization(status, definition);
- return status;
- }
- catch (RuntimeException ex) {
- resume(null, suspendedResources);
- throw ex;
- }
- catch (Error err) {
- resume(null, suspendedResources);
- throw err;
- }
- }
- else {
- if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
- logger.warn("Custom isolation level specified but no actual transaction initiated; " +
- "isolation level will effectively be ignored: " + definition);
- }
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
- }
- }
doGetTransaction方法可以获取事务对象
- protected Object doGetTransaction() {
- DataSourceTransactionObject txObject = new DataSourceTransactionObject();
- txObject.setSavepointAllowed(isNestedTransactionAllowed());
- // this.dataSource即数据源,判断当前线程如果已经记录数据库连接则使用原有连接
- ConnectionHolder conHolder =
- (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
- // false表示非新创建连接
- txObject.setConnectionHolder(conHolder, false);
- return txObject;
- }
这里的this.dataSource是我们在设置DataSourceTransactionManager时传入的(回顾下前文的事务配置)
- <tx:annotation-driven transaction-manager="transactionManager"/>
- <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
- <property name="dataSource" ref="dataSource"/>
- </bean>
- public DataSourceTransactionManager(DataSource dataSource) {
- this();
- // 设置数据源
- setDataSource(dataSource);
- afterPropertiesSet();
- }
TransactionSynchronizationManager持有一个ThreadLocal的实例,其中存放了一个Map,该Map的key为datasource,value为ConnectionHolder。
- // 用 ThreadLocal 来存放 ConnectionHolder 信息
- private static final ThreadLocal<Map<Object, Object>> resources =
- new NamedThreadLocal<>("Transactional resources");
-
- // 获取ConnectionHolder
- public static Object getResource(Object key) {
- // 包装下传入的 key
- Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
- // 在这里获取连接信息
- Object value = doGetResource(actualKey);
- return value;
- }
-
- // 具体的获取操作
- private static Object doGetResource(Object actualKey) {
- // 从ThreadLocal中获取
- Map<Object, Object> map = resources.get();
- if (map == null) {
- return null;
- }
- Object value = map.get(actualKey);
- if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
- map.remove(actualKey);
- if (map.isEmpty()) {
- resources.remove();
- }
- value = null;
- }
- return value;
- }
这个ConnectionHolder可以简单地将其理解为Connection(数据库连接)的包装类,其中最重要的属性就是Connection了。
- public class ConnectionHolder extends ResourceHolderSupport {
- // 当前数据库连接
- private final Connection connection;
- public ConnectionHolder(Connection connection) {this.connection = connection;}
- public Connection getConnection() {return this.connection;}
- }
判断当前线程是否存在事务,判读依据为当前线程记录的连接不为空且连接中(connectionHolder)中的transactionActive属性不为空。
- @Override
- protected boolean isExistingTransaction(Object transaction) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
- }
suspend方法对有无同步的事务采取了不同的方案,但真正挂起事务都是使用的doSuspend
- protected final SuspendedResourcesHolder suspend(@Nullable Object transaction)
- throws TransactionException {
- // 如果有同步的事务,则优先挂起同步的事务
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
- try {
- Object suspendedResources = null;
- if (transaction != null) {
- // 挂起操作
- suspendedResources = doSuspend(transaction);
- }
- // 重置事务名称
- String name = TransactionSynchronizationManager.getCurrentTransactionName();
- TransactionSynchronizationManager.setCurrentTransactionName(null);
- // 重置只读状态
- boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
- // 重置隔离级别
- Integer isolationLevel = TransactionSynchronizationManager
- .getCurrentTransactionIsolationLevel();
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
- // 重置事务激活状态
- boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
- TransactionSynchronizationManager.setActualTransactionActive(false);
- // 返回挂起的事务
- return new SuspendedResourcesHolder(
- suspendedResources, suspendedSynchronizations, name, readOnly,
- isolationLevel, wasActive);
- }
- catch (RuntimeException | Error ex) {
- doResumeSynchronization(suspendedSynchronizations);
- throw ex;
- }
- }
- else if (transaction != null) {
- Object suspendedResources = doSuspend(transaction);
- return new SuspendedResourcesHolder(suspendedResources);
- }
- else {
- return null;
- }
- }
可以看出挂起操作主要是将当前connectionHolder置为null,返回原有事务信息,以便于后续恢复原有事务,并将当前正在进行的事务信息进行重置。
- protected Object doSuspend(Object transaction) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- txObject.setConnectionHolder(null);
- // 解除绑定
- return TransactionSynchronizationManager.unbindResource(this.dataSource);
- }
继续进入TransactionSynchronizationManager.unbindResource来看解绑的操作,其实就是将现有的事务信息remove并返回上一级以便保存。
- // 解除绑定操作
- public static Object unbindResource(Object key) throws IllegalStateException {
- Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
- // 继续调用
- Object value = doUnbindResource(actualKey);
- if (value == null) {
- throw new IllegalStateException(...);
- }
- return value;
- }
-
- // 解除绑定操作
- private static Object doUnbindResource(Object actualKey) {
- Map<Object, Object> map = resources.get();
- if (map == null) {
- return null;
- }
- // 移除资源
- Object value = map.remove(actualKey);
- if (map.isEmpty()) {
- resources.remove();
- }
- if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
- value = null;
- }
- return value;
- }
这里我们看到了数据库连接的获取,如果是新事务需要获取新一个新的数据库连接,并为其设置了隔离级别、是否只读等属性。
- protected void doBegin(Object transaction, TransactionDefinition definition) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- Connection con = null;
- try {
- // 新事务开启时将ConnectionHolder置为了null
- if (txObject.getConnectionHolder() == null ||
- txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
- Connection newCon = this.dataSource.getConnection(); // 获取新的数据库连接
- if (logger.isDebugEnabled()) {
- logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
- }
- txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
- }
- txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
- con = txObject.getConnectionHolder().getConnection();
- // 设置事务隔离级别和readOnly属性
- Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
- txObject.setPreviousIsolationLevel(previousIsolationLevel);
- if (con.getAutoCommit()) {
- txObject.setMustRestoreAutoCommit(true);
- if (logger.isDebugEnabled()) {
- logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
- }
- // 设置由Spring控制事务提交
- con.setAutoCommit(false);
- }
- prepareTransactionalConnection(con, definition);
- // 设置当前线程的事务激活状态
- txObject.getConnectionHolder().setTransactionActive(true);
- int timeout = determineTimeout(definition);
- if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
- // 设置超时时间
- txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
- }
- if (txObject.isNewConnectionHolder()) {
- TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
- }
- }
- catch (Throwable ex) {
- if (txObject.isNewConnectionHolder()) {
- DataSourceUtils.releaseConnection(con, this.dataSource);
- txObject.setConnectionHolder(null, false);
- }
- throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
- }
- }
将事务信息记录到当前线程中
- protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
- if (status.isNewSynchronization()) {
- TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
- definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
- definition.getIsolationLevel() : null);
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
- TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
- TransactionSynchronizationManager.initSynchronization();
- }
- }
如果不需要开启事务(例如SUPPORTS)则返回一个默认的status对象。
- protected final DefaultTransactionStatus prepareTransactionStatus(
- TransactionDefinition definition, Object transaction, boolean newTransaction,
- boolean newSynchronization, boolean debug, Object suspendedResources) {
-
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
- prepareSynchronization(status, definition);
- return status;
- }
-
- protected DefaultTransactionStatus newTransactionStatus(
- TransactionDefinition definition, Object transaction, boolean newTransaction,
- boolean newSynchronization, boolean debug, Object suspendedResources) {
-
- boolean actualNewSynchronization = newSynchronization &&
- !TransactionSynchronizationManager.isSynchronizationActive();
-
- // 创建DefaultTransactionStatus对象
- return new DefaultTransactionStatus(
- transaction, newTransaction, actualNewSynchronization,
- definition.isReadOnly(), debug, suspendedResources);
- }
handleExistingTransaction方法针对不同的传播行为做不同的处理方法,比如挂起原事务开启新事务等。
- private TransactionStatus handleExistingTransaction(
- TransactionDefinition definition, Object transaction, boolean debugEnabled)
- throws TransactionException {
- // 当传播方式为NEVER时抛出异常
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
- throw new IllegalTransactionStateException(
- "Existing transaction found for transaction marked with propagation 'never'");
- }
- // 当传播方式为NOT_SUPPORTED时挂起当前事务,然后在无事务的状态中运行
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
- if (debugEnabled) {
- logger.debug("Suspending current transaction");
- }
- // 挂起事务
- Object suspendedResources = suspend(transaction);
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- // 返回默认status
- return prepareTransactionStatus(
- definition, null, false, newSynchronization, debugEnabled, suspendedResources);
- }
- // 当传播方式为REQUIRES_NEW时挂起当前事务,然后启动新的事务
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
- if (debugEnabled) {
- logger.debug("Suspending current transaction, creating new transaction with name [" +
- definition.getName() + "]");
- }
- // 挂起原事务
- SuspendedResourcesHolder suspendedResources = suspend(transaction);
- try {
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
- // 启动新的事务
- doBegin(transaction, definition);
- prepareSynchronization(status, definition);
- return status;
- }
- catch (RuntimeException beginEx) {
- resumeAfterBeginException(transaction, suspendedResources, beginEx);
- throw beginEx;
- }
- catch (Error beginErr) {
- resumeAfterBeginException(transaction, suspendedResources, beginErr);
- throw beginErr;
- }
- }
- // 当传播方式为NESTED时, 设置事务的保存点
- // 存在事务,将该事务标注保存点,形成嵌套事务。
- // 嵌套事务中的子事务出现异常不会影响到父事务保存点之前的操作。
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- if (!isNestedTransactionAllowed()) {
- throw new NestedTransactionNotSupportedException(
- "Transaction manager does not allow nested transactions by default - " +
- "specify 'nestedTransactionAllowed' property with value 'true'");
- }
- if (debugEnabled) {
- logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
- }
- if (useSavepointForNestedTransaction()) {
- DefaultTransactionStatus status =
- prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
- // 创建保存点,回滚时只回滚到该保存点
- status.createAndHoldSavepoint();
- return status;
- }
- else {
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, true, newSynchronization, debugEnabled, null);
- // 如果不支持保存点,就启动新的事务
- doBegin(transaction, definition);
- prepareSynchronization(status, definition);
- return status;
- }
- }
- // 其余代码
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
- }
到这里我们就分析完了事务的创建工作,下一步就是被代理类方法的执行,执行完之后会根据结果判断是回滚还是提交,这部分我们会在下文进行讲解。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。