赞
踩
目录
三、createTransactionIfNecessary()创建事务
四、invocation.proceedWithInvocation()执行增强方法
五、commitTransactionAfterReturning(txInfo)提交事务
六、completeTransactionAfterThrowing(txInfo, ex)异常回滚事务
通过前面一篇文章,我们已经了解了AOP切面织入生成代理对象的过程,当Bean方法通过代理对象调用时,会触发对应的AOP增强拦截器,前面提到声明式事务是一种环绕增强,对应接口为MethodInterceptor,事务增强对该接口的实现为TransactionInterceptor。这里我们以JDK动态代理为例,分析一下Spring事务代理调用的整体过程。
执行userService.insert()方法,实际上是执行的JdkDynamicAopProxy#invoke()方法:
- // org.springframework.aop.framework.JdkDynamicAopProxy.invoke
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- Object oldProxy = null;
- boolean setProxyContext = false;
-
- // 1.获取到目标对象,也就是被代理对象的引用
- TargetSource targetSource = this.advised.targetSource;
- Object target = null;
-
- try {
- // 如果目标对象未实现equals()方法,则不需要代理
- if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
- // The target does not implement the equals(Object) method itself.
- return equals(args[0]);
- }
- // 如果目标对象未实现hashCode()方法,则不需要代理
- else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
- // The target does not implement the hashCode() method itself.
- return hashCode();
- }
- // 如果方法是DecoratingProxy类型,也不需要拦截器执行
- else if (method.getDeclaringClass() == DecoratingProxy.class) {
- // There is only getDecoratedClass() declared -> dispatch to proxy config.
- return AopProxyUtils.ultimateTargetClass(this.advised);
- }
- // 如果是Advised接口或者其父接口中定义的方法,则直接反射调用,不应用通知
- else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
- method.getDeclaringClass().isAssignableFrom(Advised.class)) {
- // Service invocations on ProxyConfig with the proxy config...
- return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
- }
-
- Object retVal;
-
- // 2. 如果exposeProxy属性值为true,则将代理对象暴露到ThreadLocal中
- // exposeProxy是通过注解@EnableAspectJAutoProxy(exposeProxy = true)进行指定的,如果配置为true,
- // 则可以使用AopContext.currentProxy()获取到代理对象. 在Spring事务方法自调用的时候经常使用到.
- if (this.advised.exposeProxy) {
- // 将代理对象暴露到ThreadLocal中
- oldProxy = AopContext.setCurrentProxy(proxy);
- setProxyContext = true;
- }
-
- // 3.获得目标对象实例
- target = targetSource.getTarget();
-
- // 获取目标对象的类型
- Class<?> targetClass = (target != null ? target.getClass() : null);
-
- // 4.获得目标方法对应的拦截器链
- List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
-
- // 5.如果对应的拦截器链为空,也就是没有可以应用到此方法的通知(Interceptor),则直接通过反射方式进行调用 ==> method.invoke(target, args)
- if (chain.isEmpty()) {
- // We can skip creating a MethodInvocation: just invoke the target directly
- // Note that the final invoker must be an InvokerInterceptor so we know it does
- // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
- Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
- retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
- } else {
- // 6.如果拦截器链不为空,则需要创建一个MethodInvocation(方法调用对象)
- MethodInvocation invocation =
- new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
- // 调用其proceed方法,实际上是调用ReflectiveMethodInvocation.proceed()方法
- retVal = invocation.proceed();
- }
-
- // 7.必要时转换返回值
- Class<?> returnType = method.getReturnType();
- if (retVal != null && retVal == target &&
- returnType != Object.class && returnType.isInstance(proxy) &&
- !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
- // Special case: it returned "this" and the return type of the method
- // is type-compatible. Note that we can't help if the target sets
- // a reference to itself in another returned object.
- retVal = proxy;
- }
- else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
- throw new AopInvocationException(
- "Null return value from advice does not match primitive return type for: " + method);
- }
- return retVal;
- } finally {
- if (target != null && !targetSource.isStatic()) {
- // Must have come from TargetSource.
- targetSource.releaseTarget(target);
- }
-
- if (setProxyContext) {
- // Restore old proxy.
-
- // 恢复ThreadLocal中的旧代理对象
- AopContext.setCurrentProxy(oldProxy);
- }
- }
- }
invoke()方法的处理流程大体如下:
invoke()方法内部有三个比较重要的方法:
上述三个方法在《Spring AOP原理分析(六)-- AOP执行流程》中已经详细介绍过,读者朋友可参考前面的文章。
接下来我们进入invocation.proceed()方法,这里使用了递归调用和责任链设计模式:
- public Object proceed() throws Throwable {
- // We start with an index of -1 and increment early.
-
- // 1.当前拦截器下标:从-1的位置开始, 直到满足索引下标 =(拦截器的长度 -1)的条件(所有拦截器都执行完毕),此时需要执行目标方法
- if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
- // 执行目标方法
- return invokeJoinpoint();
- }
-
- // 2.每次调用时,将索引的值递增,从拦截器链中获取下一个需要执行的拦截器
- Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
-
- // 动态方法匹配拦截器
- if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
- // Evaluate dynamic method matcher here: static part will already have
- // been evaluated and found to match.
- InterceptorAndDynamicMethodMatcher dm =
- (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
- // 目标类所属类型
- Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
-
- // 动态匹配,判断运行时参数是否匹配
- if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
- // 如果匹配成功,则执行当前拦截器逻辑
- return dm.interceptor.invoke(this);
- } else {
- // 如果匹配失败,则会跳过当前拦截器interceptor,则会调用proceed()方法执行拦截器链中的下一个拦截器的处理
- return proceed();
- }
-
- } else {
- // It's an interceptor, so we just invoke it: The pointcut will have
- // been evaluated statically before this object was constructed.
-
- // 3.只是一个普通的拦截器,则触发拦截器链责任链的调用,并且参数为ReflectiveMethodInvocation本身
- return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
- }
- }
首先获取到拦截器链中的第一个拦截器,也就是前面的TransactionInterceptor,然后执行TransactionInterceptor#invoke方法:
- public Object invoke(MethodInvocation invocation) throws Throwable {
- // Work out the target class: may be {@code null}.
- // The TransactionAttributeSource should be passed the target class
- // as well as the method, which may be from an interface.
- Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
-
- // Adapt to TransactionAspectSupport's invokeWithinTransaction...
-
- // 以事务的方式调用目标方法。 如public abstract void com.wsh.transaction.UserService.insert()
- return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
- }
通过调用父类TransactionAspectSupport的invokeWithinTransaction方法进行事务处理,以事务的方式调用目标方法,也就是使用事务调用public abstract void com.wsh.transaction.UserService.insert()方法。
- protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
- final InvocationCallback invocation) throws Throwable {
-
- // 获取事务属性源
- TransactionAttributeSource tas = getTransactionAttributeSource();
-
- // 获取事务对应的属性
- final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
-
- // 获取事务管理器
- final PlatformTransactionManager tm = determineTransactionManager(txAttr);
-
- // 组装方法的唯一标识,例如com.wsh.transaction.UserServiceImpl.insert
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
-
- // 声明式事务的处理
- if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
- // Standard transaction demarcation with getTransaction and commit/rollback calls.
-
- // 创建一个事务
- TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
-
- Object retVal;
- try {
- // This is an around advice: Invoke the next interceptor in the chain.
- // This will normally result in a target object being invoked.
-
- // 执行增强方法(环绕通知)
- retVal = invocation.proceedWithInvocation();
- }
- catch (Throwable ex) {
- // target invocation exception
-
- // Spring默认只对RuntimeException进行回滚
- // 目标方法执行抛出异常,根据异常类型执行事务提交或者回滚操作
- completeTransactionAfterThrowing(txInfo, ex);
- throw ex;
- }
- finally {
- // 清理当前线程事务信息
- cleanupTransactionInfo(txInfo);
- }
- // 目标方法执行成功,提交事务
- commitTransactionAfterReturning(txInfo);
- return retVal;
- }
-
- else {
- // 编程式事务的处理
-
- Object result;
- final ThrowableHolder throwableHolder = new ThrowableHolder();
-
- // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
- try {
- result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
- TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
- try {
- return invocation.proceedWithInvocation();
- }
- catch (Throwable ex) {
- if (txAttr.rollbackOn(ex)) {
- // A RuntimeException: will lead to a rollback.
- if (ex instanceof RuntimeException) {
- throw (RuntimeException) ex;
- }
- else {
- throw new ThrowableHolderException(ex);
- }
- }
- else {
- // A normal return value: will lead to a commit.
- throwableHolder.throwable = ex;
- return null;
- }
- }
- finally {
- cleanupTransactionInfo(txInfo);
- }
- });
- }
- catch (ThrowableHolderException ex) {
- throw ex.getCause();
- }
- catch (TransactionSystemException ex2) {
- if (throwableHolder.throwable != null) {
- logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
- ex2.initApplicationException(throwableHolder.throwable);
- }
- throw ex2;
- }
- catch (Throwable ex2) {
- if (throwableHolder.throwable != null) {
- logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
- }
- throw ex2;
- }
-
- // Check result state: It might indicate a Throwable to rethrow.
- if (throwableHolder.throwable != null) {
- throw throwableHolder.throwable;
- }
- return result;
- }
- }
TransactionAspectSupport#invokeWithinTransaction()方法的处理流程:
可以看到,invokeWithinTransaction()方法中有几个比较关键的方法,下面逐个进行分析。
- // org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary
- protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
- @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
-
- // If no name specified, apply method identification as transaction name.
- // 如果没有指定名称,则使用方法唯一标识
- if (txAttr != null && txAttr.getName() == null) {
- txAttr = new DelegatingTransactionAttribute(txAttr) {
- @Override
- public String getName() {
- return joinpointIdentification;
- }
- };
- }
-
- TransactionStatus status = null;
- if (txAttr != null) {
- if (tm != null) {
- // 根据指定的传播行为、隔离级别等事务属性,返回当前活动事务或创建一个新的事务
- status = tm.getTransaction(txAttr);
- }
- else {
- if (logger.isDebugEnabled()) {
- logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
- "] because no transaction manager has been configured");
- }
- }
- }
- // 准备TransactionInfo
- return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
- }
createTransactionIfNecessary()创建事务主要分为两个步骤:
- public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
- // 获取事务对象,如DataSourceTransactionObject
- Object transaction = doGetTransaction();
-
- // Cache debug flag to avoid repeated checks.
- boolean debugEnabled = logger.isDebugEnabled();
-
- if (definition == null) {
- // Use defaults if no transaction definition given.
- definition = new DefaultTransactionDefinition();
- }
-
- // 判断当前线程是否存在事务
- // 判断依据:当前线程记录的连接不为空,并且连接中(connectionHolder)中的transactionActive属性为true
- if (isExistingTransaction(transaction)) {
- // Existing transaction found -> check propagation behavior to find out how to behave.
-
- // 当前线程已经存在事务
- return handleExistingTransaction(definition, transaction, debugEnabled);
- }
-
- // Check definition settings for new transaction.
-
- // 事务超时时间验证
- if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
- throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
- }
-
- // No existing transaction found -> check propagation behavior to find out how to proceed.
- // 当前线程不存在事务,并且PropagationBehavior(传播行为)是PROPAGATION_MANDATORY的话,会抛出异常
- if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
- throw new IllegalTransactionStateException(
- "No existing transaction found for transaction marked with propagation 'mandatory'");
- }
-
- else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
- definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
- definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
- // 空挂起
- SuspendedResourcesHolder suspendedResources = suspend(null);
- if (debugEnabled) {
- logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
- }
- try {
- boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
- // 传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED的话,都需要创建一个事务
- DefaultTransactionStatus status = newTransactionStatus(
- definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
-
- // 开始一个新事务,根据给定的事务语义定义
- doBegin(transaction, definition);
-
- // 准备Synchronization
- prepareSynchronization(status, definition);
- return status;
- }
- catch (RuntimeException | Error ex) {
- resume(null, suspendedResources);
- throw ex;
- }
- }
- else {
- // Create "empty" transaction: no actual transaction, but potentially synchronization.
- if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
- logger.warn("Custom isolation level specified but no actual transaction initiated; " +
- "isolation level will effectively be ignored: " + definition);
- }
- boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
- return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
- }
- }
首先通过doGetTransaction()方法获取事务对象,在本例中是DataSourceTransactionObject:
- // org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
- protected Object doGetTransaction() {
- DataSourceTransactionObject txObject = new DataSourceTransactionObject();
- txObject.setSavepointAllowed(isNestedTransactionAllowed());
- ConnectionHolder conHolder =
- (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
- txObject.setConnectionHolder(conHolder, false);
- return txObject;
- }
然后执行isExistingTransaction(transaction):判断当前线程是否存在事务。判断的依据是:当前线程记录的连接不为空,并且连接中(connectionHolder)中的transactionActive属性为true。
- protected boolean isExistingTransaction(Object transaction) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
- }
-
- public boolean hasConnectionHolder() {
- return (this.connectionHolder != null);
- }
接着根据我们配置的不同的传播行为进行处理。由于这里我们没有指定传播行为,所以使用默认的PROPAGATION_REQUIRED,所以会执行下面的逻辑:
首先执行suspend(null)方法进行空挂起,对于挂起操作,主要目的是记录原有事务的状态,以便于后续操作对事务的恢复。
- protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
- // 判断当前线程中是否存在已经激活的事务
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
- try {
- Object suspendedResources = null;
- if (transaction != null) {
- suspendedResources = doSuspend(transaction);
- }
- // 当前已经存在的事务名称
- String name = TransactionSynchronizationManager.getCurrentTransactionName();
- // 将ThreadLocal<String> currentTransactionName的值置空
- TransactionSynchronizationManager.setCurrentTransactionName(null);
- // 是否是只读事务
- boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
- // 将ThreadLocal<Boolean> currentTransactionReadOnly的值置空
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
- // 事务隔离级别
- Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
- // 将ThreadLocal<Integer> currentTransactionIsolationLevel的值置空
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
- // 事务是否激活状态
- boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
- // 将ThreadLocal<Boolean> actualTransactionActive的值置空
- TransactionSynchronizationManager.setActualTransactionActive(false);
- // 把前面从线程变量中获取出来的存在事务属性封装为挂起的事务属性返回出去
- return new SuspendedResourcesHolder(
- suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
- }
- catch (RuntimeException | Error ex) {
- // doSuspend failed - original transaction is still active...
- doResumeSynchronization(suspendedSynchronizations);
- throw ex;
- }
- }
- else if (transaction != null) {
- // Transaction active but no synchronization active.
- Object suspendedResources = doSuspend(transaction);
- return new SuspendedResourcesHolder(suspendedResources);
- }
- else {
- // Neither transaction nor synchronization active.
- return null;
- }
- }
suspend(null)方法执行完成后,通过newTransactionStatus()创建一个DefaultTransactionStatus对象:
- protected DefaultTransactionStatus newTransactionStatus(
- TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
- boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
-
- boolean actualNewSynchronization = newSynchronization &&
- !TransactionSynchronizationManager.isSynchronizationActive();
- return new DefaultTransactionStatus(
- transaction, newTransaction, actualNewSynchronization,
- definition.isReadOnly(), debug, suspendedResources);
- }
然后进入到事务开始,doBegin(transaction, definition)方法:
- // org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
- protected void doBegin(Object transaction, TransactionDefinition definition) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
- Connection con = null;
-
- try {
- if (!txObject.hasConnectionHolder() ||
- txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
- // 获取Connection连接
- Connection newCon = obtainDataSource().getConnection();
- if (logger.isDebugEnabled()) {
- logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
- }
- txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
- }
-
- txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
- con = txObject.getConnectionHolder().getConnection();
-
- // 设置隔离级别
- Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
- txObject.setPreviousIsolationLevel(previousIsolationLevel);
-
- // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
- // so we don't want to do it unnecessarily (for example if we've explicitly
- // configured the connection pool to set it already).
- if (con.getAutoCommit()) {
- txObject.setMustRestoreAutoCommit(true);
- if (logger.isDebugEnabled()) {
- logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
- }
- // 关闭自动提交
- con.setAutoCommit(false);
- }
-
- prepareTransactionalConnection(con, definition);
- // 设置当前线程是否存在事务的依据
- txObject.getConnectionHolder().setTransactionActive(true);
-
- int timeout = determineTimeout(definition);
- if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
- // 设置过期时间
- txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
- }
-
- // Bind the connection holder to the thread.
- if (txObject.isNewConnectionHolder()) {
- // 将当前获取的连接绑定到当前线程
- TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
- }
- }
-
- catch (Throwable ex) {
- if (txObject.isNewConnectionHolder()) {
- DataSourceUtils.releaseConnection(con, obtainDataSource());
- txObject.setConnectionHolder(null, false);
- }
- throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
- }
- }
这里才真正开启了数据库事务,从源码可以看到,DataSourceTransactionManager#doBegin方法的执行过程:
接着执行prepareSynchronization(status, definition)方法:
- protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
- if (status.isNewSynchronization()) {
- // 事务激活标识:设置ThreadLocal<Boolean> actualTransactionActive的值
- TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
- // 事务隔离级别:设置ThreadLocal<Integer> currentTransactionIsolationLevel的值
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
- definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
- definition.getIsolationLevel() : null);
- // 是否只读事务:设置ThreadLocal<Boolean> currentTransactionReadOnly的值
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
- // 事务名称:设置ThreadLocal<String> currentTransactionName的值
- TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
- // 初始化Synchronization
- TransactionSynchronizationManager.initSynchronization();
- }
- }
到这里,tm.getTransaction(txAttr)根据指定的传播行为、隔离级别等事务属性,返回当前活动事务或创建一个新的事务的过程就执行完了。
prepareTransactionInfo()方法把事务状态和事务属性等信息封装成一个TransactionInfo对象。
- protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
- @Nullable TransactionAttribute txAttr, String joinpointIdentification,
- @Nullable TransactionStatus status) {
- // 创建TransactionInfo对象
- TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
- if (txAttr != null) {
- // We need a transaction for this method...
- if (logger.isTraceEnabled()) {
- logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
- }
- // The transaction manager will flag an error if an incompatible tx already exists.
- txInfo.newTransactionStatus(status);
- }
- else {
- // The TransactionInfo.hasTransaction() method will return false. We created it only
- // to preserve the integrity of the ThreadLocal stack maintained in this class.
- if (logger.isTraceEnabled()) {
- logger.trace("No need to create transaction for [" + joinpointIdentification +
- "]: This method is not transactional.");
- }
- }
-
- // We always bind the TransactionInfo to the thread, even if we didn't create
- // a new transaction here. This guarantees that the TransactionInfo stack
- // will be managed correctly even if no transaction was created by this aspect.
- // 将TransactionInfo绑定到线程中
- txInfo.bindToThread();
- return txInfo;
- }
proceedWithInvocation()其实是一个钩子函数,执行proceedWithInvocation()方法时,会回来ReflectiveMethodInvocation#proceed()方法中,前面已经介绍到,从拦截器链中获取到了第一个拦截器进行执行,从前面可以看到,拦截器链中就只有一个拦截器。所以满足下面这个条件:
将会执行invokeJoinpoint()目标方法,其实就是执行我们的insert()方法:
这里因为没有发生异常,所以接下来就会执行提交事务操作,具体是执行commitTransactionAfterReturning(txInfo)方法,我们一起看下。
提交事务具体是在commitTransactionAfterReturning(txInfo)方法中实现:
- // org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
- protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
- if (txInfo != null && txInfo.getTransactionStatus() != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
- }
- // 实际上是利用事务管理器进行事务提交操作
- txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- }
- }
实际上是利用事务管理器进行事务提交操作:
- // org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
- public final void commit(TransactionStatus status) throws TransactionException {
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException(
- "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
-
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- // 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
- if (defStatus.isLocalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Transactional code has requested rollback");
- }
- processRollback(defStatus, false);
- return;
- }
-
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
- }
- processRollback(defStatus, true);
- return;
- }
-
- // 执行事务提交
- processCommit(defStatus);
- }
进入processCommit(defStatus)方法执行事务提交:
- // org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
- private void processCommit(DefaultTransactionStatus status) throws TransactionException {
- try {
- boolean beforeCompletionInvoked = false;
-
- try {
- boolean unexpectedRollback = false;
- // 扩展点
- prepareForCommit(status);
- // 触发beforeCommit的回调
- triggerBeforeCommit(status);
- // 触发beforeCompletion的回调
- triggerBeforeCompletion(status);
- beforeCompletionInvoked = true;
-
- if (status.hasSavepoint()) {
- if (status.isDebug()) {
- logger.debug("Releasing transaction savepoint");
- }
- unexpectedRollback = status.isGlobalRollbackOnly();
- // 如果存在保存点信息,则清除保存点信息
- status.releaseHeldSavepoint();
- }
- else if (status.isNewTransaction()) {
- if (status.isDebug()) {
- logger.debug("Initiating transaction commit");
- }
- unexpectedRollback = status.isGlobalRollbackOnly();
- // 如果是独立事务的话,则直接提交
- doCommit(status);
- }
- else if (isFailEarlyOnGlobalRollbackOnly()) {
- unexpectedRollback = status.isGlobalRollbackOnly();
- }
-
- // Throw UnexpectedRollbackException if we have a global rollback-only
- // marker but still didn't get a corresponding exception from commit.
- if (unexpectedRollback) {
- throw new UnexpectedRollbackException(
- "Transaction silently rolled back because it has been marked as rollback-only");
- }
- }
- catch (UnexpectedRollbackException ex) {
- // can only be caused by doCommit
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
- throw ex;
- }
- catch (TransactionException ex) {
- // can only be caused by doCommit
- if (isRollbackOnCommitFailure()) {
- // 提交过程中出现异常则回滚
- doRollbackOnCommitException(status, ex);
- }
- else {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- }
- throw ex;
- }
- catch (RuntimeException | Error ex) {
- if (!beforeCompletionInvoked) {
- triggerBeforeCompletion(status);
- }
- doRollbackOnCommitException(status, ex);
- throw ex;
- }
-
- // Trigger afterCommit callbacks, with an exception thrown there
- // propagated to callers but the transaction still considered as committed.
- try {
- triggerAfterCommit(status);
- }
- finally {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
- }
-
- }
- finally {
- cleanupAfterCompletion(status);
- }
- }
进入doCommit(status)方法:
- // org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
- protected void doCommit(DefaultTransactionStatus status) {
- // 获取到事务
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
- // 然后拿到数据库连接对象
- Connection con = txObject.getConnectionHolder().getConnection();
- if (status.isDebug()) {
- logger.debug("Committing JDBC transaction on Connection [" + con + "]");
- }
- try {
- // 通过Connection执行事务提交
- con.commit();
- }
- catch (SQLException ex) {
- throw new TransactionSystemException("Could not commit JDBC transaction", ex);
- }
- }
从源码中可以看到,Spring事务提交最底层其实还是依赖于数据库连接Connection的commit()方法进行提交的。
为了模拟事务操作发生异常,需要修改业务层代码:
重新调试程序,当发生异常时,会进入completeTransactionAfterThrowing(txInfo, ex)方法:
- // org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
- protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
- // 首先判断当前是否存在事务
- if (txInfo != null && txInfo.getTransactionStatus() != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
- "] after exception: " + ex);
- }
-
- // 只能针对RuntimeException或者Error类型的异常进行回滚
- if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
- try {
- // 根据TransactionStatus的信息回滚
- txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
- } catch (TransactionSystemException ex2) {
- logger.error("Application exception overridden by rollback exception", ex);
- ex2.initApplicationException(ex);
- throw ex2;
- }
- catch (RuntimeException | Error ex2) {
- logger.error("Application exception overridden by rollback exception", ex);
- throw ex2;
- }
- }
- else {
- // We don't roll back on this exception.
- // Will still roll back if TransactionStatus.isRollbackOnly() is true.
- try {
- // 如果不满足回滚的条件,即使抛出异常也不会回滚,同样会进行提交
- txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- }
- catch (TransactionSystemException ex2) {
- logger.error("Application exception overridden by commit exception", ex);
- ex2.initApplicationException(ex);
- throw ex2;
- }
- catch (RuntimeException | Error ex2) {
- logger.error("Application exception overridden by commit exception", ex);
- throw ex2;
- }
- }
- }
- }
completeTransactionAfterThrowing()方法首先会判断当前是否存在事务,然后通过rollbackOn()方法判断当前程序抛出的异常是否满足回滚的条件,Spring只能针对RuntimeException或者Error类型的异常进行回滚,如下所示:
- // org.springframework.transaction.interceptor.DefaultTransactionAttribute#rollbackOn
- public boolean rollbackOn(Throwable ex) {
- return (ex instanceof RuntimeException || ex instanceof Error);
- }
将会执行rollback()方法:
- // org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback
- public final void rollback(TransactionStatus status) throws TransactionException {
- // 如果事务已经提交成,则再次回滚会抛出异常
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException(
- "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
-
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- // 执行回滚操作
- processRollback(defStatus, false);
- }
进入processRollback()方法:
- private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
- try {
- boolean unexpectedRollback = unexpected;
-
- try {
- // 触发beforeCompletion回调
- triggerBeforeCompletion(status);
-
- if (status.hasSavepoint()) {
- if (status.isDebug()) {
- logger.debug("Rolling back transaction to savepoint");
- }
- // 如果存在保存点的话,也就是当前事务为单独的线程,则回退到保存点。根据底层数据库连接进行的
- status.rollbackToHeldSavepoint();
- }
- else if (status.isNewTransaction()) {
- if (status.isDebug()) {
- logger.debug("Initiating transaction rollback");
- }
- // 如果当前事务为独立的新事务,则直接回退
- // java.sql.Connection.rollback()
- doRollback(status);
- }
- else {
- // Participating in larger transaction
- if (status.hasTransaction()) {
- if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
- if (status.isDebug()) {
- logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
- }
- // 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚
- doSetRollbackOnly(status);
- }
- else {
- if (status.isDebug()) {
- logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
- }
- }
- }
- else {
- logger.debug("Should roll back transaction but cannot - no transaction available");
- }
- // Unexpected rollback only matters here if we're asked to fail early
- if (!isFailEarlyOnGlobalRollbackOnly()) {
- unexpectedRollback = false;
- }
- }
- }
- catch (RuntimeException | Error ex) {
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
- throw ex;
- }
-
- // 触发afterCompletion()方法的回调
- triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
-
- // Raise UnexpectedRollbackException if we had a global rollback-only marker
- if (unexpectedRollback) {
- throw new UnexpectedRollbackException(
- "Transaction rolled back because it has been marked as rollback-only");
- }
- }
- finally {
- // 清除记录的资源并将挂起的资源恢复
- cleanupAfterCompletion(status);
- }
- }
真正执行回滚是在doRollback(status)方法:
- protected void doRollback(DefaultTransactionStatus status) {
- DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
- // 获取数据库连接对象
- Connection con = txObject.getConnectionHolder().getConnection();
- if (status.isDebug()) {
- logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
- }
- try {
- // 通过Connection进行回滚
- con.rollback();
- }
- catch (SQLException ex) {
- throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
- }
- }
跟事务提交一样,底层借助数据库连接对象Connection的rollback()方法实现事务的回滚。
前面我们使用的是默认的传播行为进行介绍,当然还有其它传播行为,读者朋友可以扩展一下,各种传播行为,Spring内部是如何处理的。
通过前面的分析,我们知道了Spring声明式事务的原理是通过AOP动态拦截事务方法的执行,然后通过环绕通知的方式进行增强,在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务。如果事务方法执行未发生异常,将会执行事务提交;如果事务方法执行发生了异常,将会执行事务回滚。注意Spring只能针对RuntimeException或者Error类型的异常进行回滚,并且底层是利用数据库连接Connection对象来执行事务提交或者事务回滚的。
以上就是关于事务代理调用过程的分析,由于笔者水平有限,如文章中有不对之处,还请大家指正,一起学习,一起进步!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。