赞
踩
Spring事务(2)-EnableTransactionManagement实现源码解析 中介绍了Spring事务开启和代理的实现,现在了解实际事务执行TransactionInterceptor
。
MethodInterceptor
:AOP代理后方法执行拦截,例如切面中Before、After、AfterReturn、AfterThrow、Around处理实现。TransactionAspectSupport
:事务性方面的基类,获取当前事务、事务状态、PlatformTransactionManager等信息。 /**
* 此方法是事务执行的入口
*/
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
//计算出目标类型,就是被代理的类,可能是null。TransactionAttributeSource应该通过目标方法和目标类得到
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 执行事务逻辑
//调用父类的invokeWithinTransaction方法实现事务的逻辑
//在方法执行成功之后,将会执行回调函数,这里的逻辑就是继续向后执行invocation的proceed方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
执行流程:
transactionAttributeSource
属性,这个属性源在创建bean定义的时候就被设置了。getTransactionAttribute
方法获取适用于该方法的事务属性,如果TransactionAttribute
为null,则该方法为非事务方法。对于AnnotationTransactionAttributeSourceTransactionAttribute
为RuleBasedTransactionAttribute
;@Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { /* * TODO 其中事务传播行为的特点(这些特点,仔细分析代码是可以看出来的。一般先了解特点,带着问题再看源码会容易些): * PROPAGATION_REQUIRED 如果当前存在事务,假如当前事务。如果不存在事务,新建事务。 * PROPAGATION_REQUIRES_NEW 如果当前存在事务,则挂起当前事务,新建一个事务。如果不存在事务,新建一个事务。 * PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前不存在事务,则和PROPAGATION_REQUIRED一样新建事务。使用频率高 * 特点:外围事务回滚,嵌套事务全部回滚。嵌套事务回滚,如果在外围中捕获了,则仅仅回滚嵌套事务。 * PROPAGATION_MANDATORY 以事务方式运行,如果当前不存在事务,则抛出异常 * PROPAGATION_NEVER 以非事务方式运行,如果当前存在事务,则抛出异常 * PROPAGATION_SUPPORTEDS 支持事务。如果当前存在事务,加入当前事务。如果不存在事务,则以非事务方式运行。 * PROPAGATION_NOT_SUPPORTED 以非事务方式运行。如果当前存在事务,挂起当前事务。 */ /** * 1 获取事务属性源,也就是transactionAttributeSource属性,这个属性源在创建bean定义的时候就被设置了。 */ // 事务属性的获取 //基于注解的属性源是AnnotationTransactionAttributeSource //基于XML标签的属性源是NameMatchTransactionAttributeSource // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); /* * 2 调用属性源的getTransactionAttribute方法获取适用于该方法的事务属性,如果TransactionAttribute为null,则该方法为非事务方法。 * * 对于AnnotationTransactionAttributeSource,它的getTransactionAttribute * 对于NameMatchTransactionAttributeSource,它的getTransactionAttribute就是通过方法名进行匹配。 */ final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); /* * 3 确定给定事务属性所使用的特定事务管理器。 */ final PlatformTransactionManager tm = determineTransactionManager(txAttr); //获取方法的信息,也就是此前设置的DefaultTransactionAttribute中的descriptor,或者方法的全路径类名,主要用于记录日志 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); /* * 如果事务属性为null,或者获取事务管理器不是回调事务管理器,那么走下面的逻辑 * 这是最常见的标准声明式事务的逻辑,比如DataSourceTransactionManager */ if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { /* * 5 根据给定的事务属性、事务管理器、方法连接点描述字符串(全限定方法名)信息创建一个事务信息对象 * 将会解析各种事务属性,应用不同的流程,比如创建新事物、加入事务、抛出异常等 */ TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { /* * 6 invocation参数传递的是一个lambda表达式,传递的是一个方法调用invocation::proceed * 因此这里的proceedWithInvocation实际上是执行invocation的proceed方法 * 即这里将会继续向后调用链中的下一个拦截器,最后还会导致目标方法的调用(执行业务逻辑),然后会倒序返回 */ retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { /* * 7 处理执行过程中抛出的异常,可能会有事务的回滚或者提交 */ completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // TODO 清除事务信息,将当前事务清除,如果存在旧的事务对象,将旧的事务对象设置为当前持有的事务 // 存储在TransactionAspectSupport.transactionInfoHolder中,这是一个静态TrheadLocal常量。 cleanupTransactionInfo(txInfo); } // TODO 提交事务 commitTransactionAfterReturning(txInfo); return retVal; } /* * 如果txAttr不为null并且事务管理器属于CallbackPreferringPlatformTransactionManager,走下面的逻辑 * 只有使用WebSphereUowTransactionManager时才会走这个逻辑,不常见,甚至几乎见不到 */ else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } }
判断是否需要创建事务。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 如果未指定事务名称,则使用当前方法的全路径名作为事务的名称。 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // TODO 重点:获取TransactionStatus对象 // 无论被代理对象的方法是否需要事务,在通过代理对象调用方法时,总会创建一个TransactionStatus,讲一些信息绑定到 // ThreadLocal的缓存变量中。这是为了在执行完方法的时候,可以回溯到调用方法存在的事务对象。类似责任链。 // 如果挂起了存在的事务,则当前方法执行的事务状态对象中会存储SuspendedResourcesHolder,及挂起的事务信息, // 当方法执行完后,将这个挂起从新设置到当前事务中来。 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // TODO 重点:将上面获取到的对象封装成TransactionInfo对象 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
根据当前事务和事务的传播性判断使用当前事务、创建新事务、挂起事务。
@Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { /* * 获取当前事务,该方法由具体的事务管理器子类自己实现,比如DataSourceTransactionManager、JtaTransactionManager * 一般都使用DataSourceTransactionManager这个事务管理器 * * 对于DataSourceTransactionManager这里获取的实际上是一个数据库的事务连接对象,即DataSourceTransactionObject */ Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 判断是否已经存在事务。如果当前事务对象已经持有了链接,并且事务是活跃状态,则表示已经存在事务。 // 如果当前已经存在事务,按照已存在事务的方式判断传播行为,进行处理 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // TODO 如果当前已存在事务,会走到此方法逻辑进行处理。 return handleExistingTransaction(definition, transaction, debugEnabled); } // TODO 以下是针对前一个方法未开启事务的情况的处理。 // 检测事务的超时时间设置是否正确 // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 如果传播属性是PROPAGATION_MANDATORY,且当前不存在事务,则抛出异常 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); }// 如果传播属性是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED时,会走到这里 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 因为以上三种事务传播属性的特性,这里会挂起(如果当前存在事务) SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 创建事务状态对象 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // TODO 重点:开启事务。按照当前测试代码注册的事务管理器,使用的是DataSourceTransactionManager doBegin(transaction, definition); // 如果是新开启的事务,设置事务管理器相关属性。其实走到这里的代码,一定是新启的事务。 // 因为前面已经判断了,如果存在事务,就走handleExistingTransaction方法 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // 以非事务方式运行 NEVER SUPPORTS NOT_SUPPORTED,在外围未开启事务时,会走这里。 // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } /** * 为现有事务创建一个TransactionStatus * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 如果当前方法传播属性为从不开启事务,直接抛出异常。 // PROPAGATION_NEVER特性决定,如果当前存在事务,则抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // PROPAGATION_NOT_SUPPORTED特性:如果当前存在事务,挂起当前事务,以非事务方式运行 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 挂起事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 返回事务状态对象 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // TODO 重点:PROPAGATION_REQUIRES_NEW特性:无论当前是否存在事务,都新建一个事务。 // 如果存在事务,将会把当前事务挂起,被挂起的事务会存储到新的事务状态对象中。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } // 挂起当前事务,由于PROPAGATION_REQUIRES_NEW的特性,需要使用新的事务,所以要将当前事务挂起,当新的事务执行完毕时,会恢复这个挂起的事务。 SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // TODO 可以看到,这里创建事务对象时,构造函数中的参数newTransaction为true。 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // PROPAGATION_REQUIRES_NEW表示总是新启一个事务,这里会新开启事务。 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // TODO PROPAGATION_NESTED嵌套事务,如果存在事务,则在嵌套事务中运行。如果不存在事务,则新建一个事务。 // 嵌套事务是用savePoint来实现的。具体过程: // 1.新建事务状态对象时,会设置一个保存点(或者叫回滚点)。 // 2.如果业务方法执行抛出异常,则会被事务切面捕获,如果存在保存点,则会回滚值保存点。 // 3.正常执行完业务方法,如果这个方法对应的事务状态对象中具有保存点,则会擦除这个保存点。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 是否使用保存点,在测试中用到的事务管理器来看,总是返回true if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 创建savePoint,保存点(或者叫回滚点) status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } // 这里是一些校验代码 if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 创建事务状态对象,如果是新创建的事务,则设置事务管理器的相关属性。 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
根据TransactionAttribute
属性和状态对象准备TransactionInfo
。
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { // 创建事务信息对象,如果方法是具有事务属性的,将事务状态对象封装到事务信息对象中。 TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } //那么设置事务状态,这里就表示为当前方法创建了一个事务 txInfo.newTransactionStatus(status); } else { if (logger.isTraceEnabled()) { logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } } // 将事务信息对象绑定到当前线程。这时会将旧的事务信息对象值,暂存到另一个变量 oldTransactionInfo 中。 txInfo.bindToThread(); return txInfo; }
出现异常时,事务处理方法。
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { // 如果存在事务状态对象 if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } // 判断事务属性不为空并且满足回滚规则,就进行回滚,否则进行事务提交 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // TODO 重点:具体的回滚代码 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { // 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } } /** * RuleBasedTransactionAttribute#rollbackOn 方法 * * 采用"Winning rule"机制来判断对当前异常是否需要进行回滚 * 这是最简单的规则,即选择异常的继承层次结构中最接近当前异常的规则。 * 如果需要回滚,则返回true,否则返回false * * @see TransactionAttribute#rollbackOn(java.lang.Throwable) */ @Override public boolean rollbackOn(Throwable ex) { if (logger.isTraceEnabled()) { logger.trace("Applying rules to determine whether transaction should rollback on " + ex); } //rollbackRules中最匹配的回滚规则,默认为null RollbackRuleAttribute winner = null; //回滚规则匹配成功时的匹配异常栈深度,用来查找最匹配的那一个回滚规则 int deepest = Integer.MAX_VALUE; //如果rollbackRules回滚规则集合不为null,那么判断回滚规则是否匹配 if (this.rollbackRules != null) { for (RollbackRuleAttribute rule : this.rollbackRules) { // 递归调用异常的父类,如果异常名称中包含规则中封装的异常名称,则返回。 int depth = rule.getDepth(ex); // 如果不满足规则,会返回-1,如果满足了规则,会把这个规则赋值到winner临时变量中。 if (depth >= 0 && depth < deepest) { //那么deepest赋值为当前异常栈深度,即找到最匹配的那一个 deepest = depth; //winner设置为当前回滚规则实例 winner = rule; } } } if (logger.isTraceEnabled()) { logger.trace("Winning rollback rule is: " + winner); } // 如果没有配置回滚异常或不回滚异常,则会走默认的。默认回滚运行时异常及Error: // (ex instanceof RuntimeException || ex instanceof Error); // User superclass behavior (rollback on unchecked) if no rule matches. if (winner == null) { logger.trace("No relevant rollback rule found: applying default rules"); return super.rollbackOn(ex); } // 判断规则如果是不回滚规则,则返回false,表示不需要回滚。 // 判断规则如果不是不回滚规则,返回true,表示需要回滚 return !(winner instanceof NoRollbackRuleAttribute); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。