当前位置:   article > 正文

Spring事务原理分析(三)--事务代理调用过程_invokewithintransaction 嵌套事务处理过程

invokewithintransaction 嵌套事务处理过程

目录

一、概述

二、事务代理调用过程分析

三、createTransactionIfNecessary()创建事务

四、invocation.proceedWithInvocation()执行增强方法

五、commitTransactionAfterReturning(txInfo)提交事务

六、completeTransactionAfterThrowing(txInfo, ex)异常回滚事务

七、总结


一、概述

通过前面一篇文章,我们已经了解了AOP切面织入生成代理对象的过程,当Bean方法通过代理对象调用时,会触发对应的AOP增强拦截器,前面提到声明式事务是一种环绕增强,对应接口为MethodInterceptor,事务增强对该接口的实现为TransactionInterceptor。这里我们以JDK动态代理为例,分析一下Spring事务代理调用的整体过程。

二、事务代理调用过程分析

执行userService.insert()方法,实际上是执行的JdkDynamicAopProxy#invoke()方法:

  1. // org.springframework.aop.framework.JdkDynamicAopProxy.invoke
  2. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  3. Object oldProxy = null;
  4. boolean setProxyContext = false;
  5. // 1.获取到目标对象,也就是被代理对象的引用
  6. TargetSource targetSource = this.advised.targetSource;
  7. Object target = null;
  8. try {
  9. // 如果目标对象未实现equals()方法,则不需要代理
  10. if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
  11. // The target does not implement the equals(Object) method itself.
  12. return equals(args[0]);
  13. }
  14. // 如果目标对象未实现hashCode()方法,则不需要代理
  15. else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
  16. // The target does not implement the hashCode() method itself.
  17. return hashCode();
  18. }
  19. // 如果方法是DecoratingProxy类型,也不需要拦截器执行
  20. else if (method.getDeclaringClass() == DecoratingProxy.class) {
  21. // There is only getDecoratedClass() declared -> dispatch to proxy config.
  22. return AopProxyUtils.ultimateTargetClass(this.advised);
  23. }
  24. // 如果是Advised接口或者其父接口中定义的方法,则直接反射调用,不应用通知
  25. else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
  26. method.getDeclaringClass().isAssignableFrom(Advised.class)) {
  27. // Service invocations on ProxyConfig with the proxy config...
  28. return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
  29. }
  30. Object retVal;
  31. // 2. 如果exposeProxy属性值为true,则将代理对象暴露到ThreadLocal中
  32. // exposeProxy是通过注解@EnableAspectJAutoProxy(exposeProxy = true)进行指定的,如果配置为true,
  33. // 则可以使用AopContext.currentProxy()获取到代理对象. 在Spring事务方法自调用的时候经常使用到.
  34. if (this.advised.exposeProxy) {
  35. // 将代理对象暴露到ThreadLocal中
  36. oldProxy = AopContext.setCurrentProxy(proxy);
  37. setProxyContext = true;
  38. }
  39. // 3.获得目标对象实例
  40. target = targetSource.getTarget();
  41. // 获取目标对象的类型
  42. Class<?> targetClass = (target != null ? target.getClass() : null);
  43. // 4.获得目标方法对应的拦截器链
  44. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
  45. // 5.如果对应的拦截器链为空,也就是没有可以应用到此方法的通知(Interceptor),则直接通过反射方式进行调用 ==> method.invoke(target, args)
  46. if (chain.isEmpty()) {
  47. // We can skip creating a MethodInvocation: just invoke the target directly
  48. // Note that the final invoker must be an InvokerInterceptor so we know it does
  49. // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
  50. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
  51. retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
  52. } else {
  53. // 6.如果拦截器链不为空,则需要创建一个MethodInvocation(方法调用对象)
  54. MethodInvocation invocation =
  55. new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
  56. // 调用其proceed方法,实际上是调用ReflectiveMethodInvocation.proceed()方法
  57. retVal = invocation.proceed();
  58. }
  59. // 7.必要时转换返回值
  60. Class<?> returnType = method.getReturnType();
  61. if (retVal != null && retVal == target &&
  62. returnType != Object.class && returnType.isInstance(proxy) &&
  63. !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
  64. // Special case: it returned "this" and the return type of the method
  65. // is type-compatible. Note that we can't help if the target sets
  66. // a reference to itself in another returned object.
  67. retVal = proxy;
  68. }
  69. else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
  70. throw new AopInvocationException(
  71. "Null return value from advice does not match primitive return type for: " + method);
  72. }
  73. return retVal;
  74. } finally {
  75. if (target != null && !targetSource.isStatic()) {
  76. // Must have come from TargetSource.
  77. targetSource.releaseTarget(target);
  78. }
  79. if (setProxyContext) {
  80. // Restore old proxy.
  81. // 恢复ThreadLocal中的旧代理对象
  82. AopContext.setCurrentProxy(oldProxy);
  83. }
  84. }
  85. }

invoke()方法的处理流程大体如下:

  1. 获取到目标对象,也就是被代理对象的引用;
  2. 处理exposeProxy属性,如果exposeProxy属性值为true,则将代理对象暴露到ThreadLocal中;
  3. 获得目标方法对应的拦截器链;
  4. 如果对应的拦截器链为空,也就是没有可以应用到此方法的通知(Interceptor),则直接通过反射方式进行调用;
  5. 如果拦截器链不为空,则需要创建一个MethodInvocation(方法调用对象),调用其proceed方法,实际上是调用ReflectiveMethodInvocation.proceed()方法;
  6. 有需要的话,转换返回值;

invoke()方法内部有三个比较重要的方法:

  • getInterceptorsAndDynamicInterceptionAdvice():获得目标方法对应的拦截器链

  • invokeJoinpointUsingReflection():通过反射方式调用目标方法
  • MethodInvocation#proceed():执行拦截器链方法

上述三个方法在《Spring AOP原理分析(六)-- AOP执行流程》中已经详细介绍过,读者朋友可参考前面的文章。

接下来我们进入invocation.proceed()方法,这里使用了递归调用和责任链设计模式:

  1. public Object proceed() throws Throwable {
  2. // We start with an index of -1 and increment early.
  3. // 1.当前拦截器下标:从-1的位置开始, 直到满足索引下标 =(拦截器的长度 -1)的条件(所有拦截器都执行完毕),此时需要执行目标方法
  4. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
  5. // 执行目标方法
  6. return invokeJoinpoint();
  7. }
  8. // 2.每次调用时,将索引的值递增,从拦截器链中获取下一个需要执行的拦截器
  9. Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
  10. // 动态方法匹配拦截器
  11. if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
  12. // Evaluate dynamic method matcher here: static part will already have
  13. // been evaluated and found to match.
  14. InterceptorAndDynamicMethodMatcher dm =
  15. (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
  16. // 目标类所属类型
  17. Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
  18. // 动态匹配,判断运行时参数是否匹配
  19. if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
  20. // 如果匹配成功,则执行当前拦截器逻辑
  21. return dm.interceptor.invoke(this);
  22. } else {
  23. // 如果匹配失败,则会跳过当前拦截器interceptor,则会调用proceed()方法执行拦截器链中的下一个拦截器的处理
  24. return proceed();
  25. }
  26. } else {
  27. // It's an interceptor, so we just invoke it: The pointcut will have
  28. // been evaluated statically before this object was constructed.
  29. // 3.只是一个普通的拦截器,则触发拦截器链责任链的调用,并且参数为ReflectiveMethodInvocation本身
  30. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
  31. }
  32. }

首先获取到拦截器链中的第一个拦截器,也就是前面的TransactionInterceptor,然后执行TransactionInterceptor#invoke方法:

  1. public Object invoke(MethodInvocation invocation) throws Throwable {
  2. // Work out the target class: may be {@code null}.
  3. // The TransactionAttributeSource should be passed the target class
  4. // as well as the method, which may be from an interface.
  5. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  6. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  7. // 以事务的方式调用目标方法。 如public abstract void com.wsh.transaction.UserService.insert()
  8. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  9. }

通过调用父类TransactionAspectSupport的invokeWithinTransaction方法进行事务处理,以事务的方式调用目标方法,也就是使用事务调用public abstract void com.wsh.transaction.UserService.insert()方法。

  1. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  2. final InvocationCallback invocation) throws Throwable {
  3. // 获取事务属性源
  4. TransactionAttributeSource tas = getTransactionAttributeSource();
  5. // 获取事务对应的属性
  6. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  7. // 获取事务管理器
  8. final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  9. // 组装方法的唯一标识,例如com.wsh.transaction.UserServiceImpl.insert
  10. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  11. // 声明式事务的处理
  12. if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
  13. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  14. // 创建一个事务
  15. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
  16. Object retVal;
  17. try {
  18. // This is an around advice: Invoke the next interceptor in the chain.
  19. // This will normally result in a target object being invoked.
  20. // 执行增强方法(环绕通知)
  21. retVal = invocation.proceedWithInvocation();
  22. }
  23. catch (Throwable ex) {
  24. // target invocation exception
  25. // Spring默认只对RuntimeException进行回滚
  26. // 目标方法执行抛出异常,根据异常类型执行事务提交或者回滚操作
  27. completeTransactionAfterThrowing(txInfo, ex);
  28. throw ex;
  29. }
  30. finally {
  31. // 清理当前线程事务信息
  32. cleanupTransactionInfo(txInfo);
  33. }
  34. // 目标方法执行成功,提交事务
  35. commitTransactionAfterReturning(txInfo);
  36. return retVal;
  37. }
  38. else {
  39. // 编程式事务的处理
  40. Object result;
  41. final ThrowableHolder throwableHolder = new ThrowableHolder();
  42. // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
  43. try {
  44. result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
  45. TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  46. try {
  47. return invocation.proceedWithInvocation();
  48. }
  49. catch (Throwable ex) {
  50. if (txAttr.rollbackOn(ex)) {
  51. // A RuntimeException: will lead to a rollback.
  52. if (ex instanceof RuntimeException) {
  53. throw (RuntimeException) ex;
  54. }
  55. else {
  56. throw new ThrowableHolderException(ex);
  57. }
  58. }
  59. else {
  60. // A normal return value: will lead to a commit.
  61. throwableHolder.throwable = ex;
  62. return null;
  63. }
  64. }
  65. finally {
  66. cleanupTransactionInfo(txInfo);
  67. }
  68. });
  69. }
  70. catch (ThrowableHolderException ex) {
  71. throw ex.getCause();
  72. }
  73. catch (TransactionSystemException ex2) {
  74. if (throwableHolder.throwable != null) {
  75. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  76. ex2.initApplicationException(throwableHolder.throwable);
  77. }
  78. throw ex2;
  79. }
  80. catch (Throwable ex2) {
  81. if (throwableHolder.throwable != null) {
  82. logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
  83. }
  84. throw ex2;
  85. }
  86. // Check result state: It might indicate a Throwable to rethrow.
  87. if (throwableHolder.throwable != null) {
  88. throw throwableHolder.throwable;
  89. }
  90. return result;
  91. }
  92. }

TransactionAspectSupport#invokeWithinTransaction()方法的处理流程:

  1. 获取事务属性源;
  2. 获取事务对应的属性;
  3. 获取PlatformTransactionManager事务管理器;
  4. 组装方法的唯一标识,例如com.wsh.transaction.UserServiceImpl.insert;
  5. 针对声明式事务、编程式事务的处理,在项目中我们通常都是使用声明式事务的方式,所以这里以声明式事务为例进行介绍;
  6. 通过createTransactionIfNecessary()方法创建一个事务;
  7. invocation.proceedWithInvocation()执行增强方法(环绕通知);
  8. 如果事务方法执行未发生异常,则调用commitTransactionAfterReturning(txInfo)进行提交事务;
  9. 如果事务方法执行发生异常,则调用completeTransactionAfterThrowing(txInfo, ex)进行异常回滚;

可以看到,invokeWithinTransaction()方法中有几个比较关键的方法,下面逐个进行分析。

三、createTransactionIfNecessary()创建事务

  1. // org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary
  2. protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
  3. @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  4. // If no name specified, apply method identification as transaction name.
  5. // 如果没有指定名称,则使用方法唯一标识
  6. if (txAttr != null && txAttr.getName() == null) {
  7. txAttr = new DelegatingTransactionAttribute(txAttr) {
  8. @Override
  9. public String getName() {
  10. return joinpointIdentification;
  11. }
  12. };
  13. }
  14. TransactionStatus status = null;
  15. if (txAttr != null) {
  16. if (tm != null) {
  17. // 根据指定的传播行为、隔离级别等事务属性,返回当前活动事务或创建一个新的事务
  18. status = tm.getTransaction(txAttr);
  19. }
  20. else {
  21. if (logger.isDebugEnabled()) {
  22. logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
  23. "] because no transaction manager has been configured");
  24. }
  25. }
  26. }
  27. // 准备TransactionInfo
  28. return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  29. }

createTransactionIfNecessary()创建事务主要分为两个步骤:

  • (1)、tm.getTransaction(txAttr)返回当前活动事务或创建一个新的事务
  1. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  2. // 获取事务对象,如DataSourceTransactionObject
  3. Object transaction = doGetTransaction();
  4. // Cache debug flag to avoid repeated checks.
  5. boolean debugEnabled = logger.isDebugEnabled();
  6. if (definition == null) {
  7. // Use defaults if no transaction definition given.
  8. definition = new DefaultTransactionDefinition();
  9. }
  10. // 判断当前线程是否存在事务
  11. // 判断依据:当前线程记录的连接不为空,并且连接中(connectionHolder)中的transactionActive属性为true
  12. if (isExistingTransaction(transaction)) {
  13. // Existing transaction found -> check propagation behavior to find out how to behave.
  14. // 当前线程已经存在事务
  15. return handleExistingTransaction(definition, transaction, debugEnabled);
  16. }
  17. // Check definition settings for new transaction.
  18. // 事务超时时间验证
  19. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
  20. throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
  21. }
  22. // No existing transaction found -> check propagation behavior to find out how to proceed.
  23. // 当前线程不存在事务,并且PropagationBehavior(传播行为)是PROPAGATION_MANDATORY的话,会抛出异常
  24. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  25. throw new IllegalTransactionStateException(
  26. "No existing transaction found for transaction marked with propagation 'mandatory'");
  27. }
  28. else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  29. definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  30. definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  31. // 空挂起
  32. SuspendedResourcesHolder suspendedResources = suspend(null);
  33. if (debugEnabled) {
  34. logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
  35. }
  36. try {
  37. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  38. // 传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED的话,都需要创建一个事务
  39. DefaultTransactionStatus status = newTransactionStatus(
  40. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  41. // 开始一个新事务,根据给定的事务语义定义
  42. doBegin(transaction, definition);
  43. // 准备Synchronization
  44. prepareSynchronization(status, definition);
  45. return status;
  46. }
  47. catch (RuntimeException | Error ex) {
  48. resume(null, suspendedResources);
  49. throw ex;
  50. }
  51. }
  52. else {
  53. // Create "empty" transaction: no actual transaction, but potentially synchronization.
  54. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
  55. logger.warn("Custom isolation level specified but no actual transaction initiated; " +
  56. "isolation level will effectively be ignored: " + definition);
  57. }
  58. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  59. return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
  60. }
  61. }

首先通过doGetTransaction()方法获取事务对象,在本例中是DataSourceTransactionObject:

  1. // org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
  2. protected Object doGetTransaction() {
  3. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  4. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  5. ConnectionHolder conHolder =
  6. (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  7. txObject.setConnectionHolder(conHolder, false);
  8. return txObject;
  9. }

然后执行isExistingTransaction(transaction):判断当前线程是否存在事务。判断的依据是:当前线程记录的连接不为空,并且连接中(connectionHolder)中的transactionActive属性为true。

  1. protected boolean isExistingTransaction(Object transaction) {
  2. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  3. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  4. }
  5. public boolean hasConnectionHolder() {
  6. return (this.connectionHolder != null);
  7. }

接着根据我们配置的不同的传播行为进行处理。由于这里我们没有指定传播行为,所以使用默认的PROPAGATION_REQUIRED,所以会执行下面的逻辑:

首先执行suspend(null)方法进行空挂起,对于挂起操作,主要目的是记录原有事务的状态,以便于后续操作对事务的恢复

  1. protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
  2. // 判断当前线程中是否存在已经激活的事务
  3. if (TransactionSynchronizationManager.isSynchronizationActive()) {
  4. List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
  5. try {
  6. Object suspendedResources = null;
  7. if (transaction != null) {
  8. suspendedResources = doSuspend(transaction);
  9. }
  10. // 当前已经存在的事务名称
  11. String name = TransactionSynchronizationManager.getCurrentTransactionName();
  12. // 将ThreadLocal<String> currentTransactionName的值置空
  13. TransactionSynchronizationManager.setCurrentTransactionName(null);
  14. // 是否是只读事务
  15. boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
  16. // 将ThreadLocal<Boolean> currentTransactionReadOnly的值置空
  17. TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
  18. // 事务隔离级别
  19. Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  20. // 将ThreadLocal<Integer> currentTransactionIsolationLevel的值置空
  21. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
  22. // 事务是否激活状态
  23. boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
  24. // 将ThreadLocal<Boolean> actualTransactionActive的值置空
  25. TransactionSynchronizationManager.setActualTransactionActive(false);
  26. // 把前面从线程变量中获取出来的存在事务属性封装为挂起的事务属性返回出去
  27. return new SuspendedResourcesHolder(
  28. suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
  29. }
  30. catch (RuntimeException | Error ex) {
  31. // doSuspend failed - original transaction is still active...
  32. doResumeSynchronization(suspendedSynchronizations);
  33. throw ex;
  34. }
  35. }
  36. else if (transaction != null) {
  37. // Transaction active but no synchronization active.
  38. Object suspendedResources = doSuspend(transaction);
  39. return new SuspendedResourcesHolder(suspendedResources);
  40. }
  41. else {
  42. // Neither transaction nor synchronization active.
  43. return null;
  44. }
  45. }

suspend(null)方法执行完成后,通过newTransactionStatus()创建一个DefaultTransactionStatus对象:

  1. protected DefaultTransactionStatus newTransactionStatus(
  2. TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
  3. boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
  4. boolean actualNewSynchronization = newSynchronization &&
  5. !TransactionSynchronizationManager.isSynchronizationActive();
  6. return new DefaultTransactionStatus(
  7. transaction, newTransaction, actualNewSynchronization,
  8. definition.isReadOnly(), debug, suspendedResources);
  9. }

然后进入到事务开始,doBegin(transaction, definition)方法:

  1. // org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
  2. protected void doBegin(Object transaction, TransactionDefinition definition) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4. Connection con = null;
  5. try {
  6. if (!txObject.hasConnectionHolder() ||
  7. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  8. // 获取Connection连接
  9. Connection newCon = obtainDataSource().getConnection();
  10. if (logger.isDebugEnabled()) {
  11. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  12. }
  13. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  14. }
  15. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  16. con = txObject.getConnectionHolder().getConnection();
  17. // 设置隔离级别
  18. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  19. txObject.setPreviousIsolationLevel(previousIsolationLevel);
  20. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  21. // so we don't want to do it unnecessarily (for example if we've explicitly
  22. // configured the connection pool to set it already).
  23. if (con.getAutoCommit()) {
  24. txObject.setMustRestoreAutoCommit(true);
  25. if (logger.isDebugEnabled()) {
  26. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  27. }
  28. // 关闭自动提交
  29. con.setAutoCommit(false);
  30. }
  31. prepareTransactionalConnection(con, definition);
  32. // 设置当前线程是否存在事务的依据
  33. txObject.getConnectionHolder().setTransactionActive(true);
  34. int timeout = determineTimeout(definition);
  35. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  36. // 设置过期时间
  37. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  38. }
  39. // Bind the connection holder to the thread.
  40. if (txObject.isNewConnectionHolder()) {
  41. // 将当前获取的连接绑定到当前线程
  42. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  43. }
  44. }
  45. catch (Throwable ex) {
  46. if (txObject.isNewConnectionHolder()) {
  47. DataSourceUtils.releaseConnection(con, obtainDataSource());
  48. txObject.setConnectionHolder(null, false);
  49. }
  50. throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  51. }
  52. }

这里才真正开启了数据库事务,从源码可以看到,DataSourceTransactionManager#doBegin方法的执行过程:

  1. 获取一个数据库连接对象Connection;
  2. 设置DataSourceTransactionObject的一些属性,如隔离级别、是否自动提交、过期时间等;
  3. 通过Connection.setAutoCommit(false)关闭事务的自动提交;
  4. 将当前获取的连接绑定到当前线程,实际上是保存在ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources")中;resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在DataSourceTransactionManager的例子中就是以dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。

接着执行prepareSynchronization(status, definition)方法:

  1. protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
  2. if (status.isNewSynchronization()) {
  3. // 事务激活标识:设置ThreadLocal<Boolean> actualTransactionActive的值
  4. TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
  5. // 事务隔离级别:设置ThreadLocal<Integer> currentTransactionIsolationLevel的值
  6. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
  7. definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
  8. definition.getIsolationLevel() : null);
  9. // 是否只读事务:设置ThreadLocal<Boolean> currentTransactionReadOnly的值
  10. TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
  11. // 事务名称:设置ThreadLocal<String> currentTransactionName的值
  12. TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
  13. // 初始化Synchronization
  14. TransactionSynchronizationManager.initSynchronization();
  15. }
  16. }

到这里,tm.getTransaction(txAttr)根据指定的传播行为、隔离级别等事务属性,返回当前活动事务或创建一个新的事务的过程就执行完了。

  • (2)、prepareTransactionInfo()准备TransactionInfo

prepareTransactionInfo()方法把事务状态和事务属性等信息封装成一个TransactionInfo对象。

  1. protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
  2. @Nullable TransactionAttribute txAttr, String joinpointIdentification,
  3. @Nullable TransactionStatus status) {
  4. // 创建TransactionInfo对象
  5. TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
  6. if (txAttr != null) {
  7. // We need a transaction for this method...
  8. if (logger.isTraceEnabled()) {
  9. logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
  10. }
  11. // The transaction manager will flag an error if an incompatible tx already exists.
  12. txInfo.newTransactionStatus(status);
  13. }
  14. else {
  15. // The TransactionInfo.hasTransaction() method will return false. We created it only
  16. // to preserve the integrity of the ThreadLocal stack maintained in this class.
  17. if (logger.isTraceEnabled()) {
  18. logger.trace("No need to create transaction for [" + joinpointIdentification +
  19. "]: This method is not transactional.");
  20. }
  21. }
  22. // We always bind the TransactionInfo to the thread, even if we didn't create
  23. // a new transaction here. This guarantees that the TransactionInfo stack
  24. // will be managed correctly even if no transaction was created by this aspect.
  25. // 将TransactionInfo绑定到线程中
  26. txInfo.bindToThread();
  27. return txInfo;
  28. }

四、invocation.proceedWithInvocation()执行增强方法

proceedWithInvocation()其实是一个钩子函数,执行proceedWithInvocation()方法时,会回来ReflectiveMethodInvocation#proceed()方法中,前面已经介绍到,从拦截器链中获取到了第一个拦截器进行执行,从前面可以看到,拦截器链中就只有一个拦截器。所以满足下面这个条件:

将会执行invokeJoinpoint()目标方法,其实就是执行我们的insert()方法:

这里因为没有发生异常,所以接下来就会执行提交事务操作,具体是执行commitTransactionAfterReturning(txInfo)方法,我们一起看下。

五、commitTransactionAfterReturning(txInfo)提交事务

提交事务具体是在commitTransactionAfterReturning(txInfo)方法中实现: 

  1. // org.springframework.transaction.interceptor.TransactionAspectSupport#commitTransactionAfterReturning
  2. protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  3. if (txInfo != null && txInfo.getTransactionStatus() != null) {
  4. if (logger.isTraceEnabled()) {
  5. logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
  6. }
  7. // 实际上是利用事务管理器进行事务提交操作
  8. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  9. }
  10. }

实际上是利用事务管理器进行事务提交操作:

  1. // org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
  2. public final void commit(TransactionStatus status) throws TransactionException {
  3. if (status.isCompleted()) {
  4. throw new IllegalTransactionStateException(
  5. "Transaction is already completed - do not call commit or rollback more than once per transaction");
  6. }
  7. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  8. // 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
  9. if (defStatus.isLocalRollbackOnly()) {
  10. if (defStatus.isDebug()) {
  11. logger.debug("Transactional code has requested rollback");
  12. }
  13. processRollback(defStatus, false);
  14. return;
  15. }
  16. if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
  17. if (defStatus.isDebug()) {
  18. logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
  19. }
  20. processRollback(defStatus, true);
  21. return;
  22. }
  23. // 执行事务提交
  24. processCommit(defStatus);
  25. }

进入processCommit(defStatus)方法执行事务提交:

  1. // org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
  2. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  3. try {
  4. boolean beforeCompletionInvoked = false;
  5. try {
  6. boolean unexpectedRollback = false;
  7. // 扩展点
  8. prepareForCommit(status);
  9. // 触发beforeCommit的回调
  10. triggerBeforeCommit(status);
  11. // 触发beforeCompletion的回调
  12. triggerBeforeCompletion(status);
  13. beforeCompletionInvoked = true;
  14. if (status.hasSavepoint()) {
  15. if (status.isDebug()) {
  16. logger.debug("Releasing transaction savepoint");
  17. }
  18. unexpectedRollback = status.isGlobalRollbackOnly();
  19. // 如果存在保存点信息,则清除保存点信息
  20. status.releaseHeldSavepoint();
  21. }
  22. else if (status.isNewTransaction()) {
  23. if (status.isDebug()) {
  24. logger.debug("Initiating transaction commit");
  25. }
  26. unexpectedRollback = status.isGlobalRollbackOnly();
  27. // 如果是独立事务的话,则直接提交
  28. doCommit(status);
  29. }
  30. else if (isFailEarlyOnGlobalRollbackOnly()) {
  31. unexpectedRollback = status.isGlobalRollbackOnly();
  32. }
  33. // Throw UnexpectedRollbackException if we have a global rollback-only
  34. // marker but still didn't get a corresponding exception from commit.
  35. if (unexpectedRollback) {
  36. throw new UnexpectedRollbackException(
  37. "Transaction silently rolled back because it has been marked as rollback-only");
  38. }
  39. }
  40. catch (UnexpectedRollbackException ex) {
  41. // can only be caused by doCommit
  42. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  43. throw ex;
  44. }
  45. catch (TransactionException ex) {
  46. // can only be caused by doCommit
  47. if (isRollbackOnCommitFailure()) {
  48. // 提交过程中出现异常则回滚
  49. doRollbackOnCommitException(status, ex);
  50. }
  51. else {
  52. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  53. }
  54. throw ex;
  55. }
  56. catch (RuntimeException | Error ex) {
  57. if (!beforeCompletionInvoked) {
  58. triggerBeforeCompletion(status);
  59. }
  60. doRollbackOnCommitException(status, ex);
  61. throw ex;
  62. }
  63. // Trigger afterCommit callbacks, with an exception thrown there
  64. // propagated to callers but the transaction still considered as committed.
  65. try {
  66. triggerAfterCommit(status);
  67. }
  68. finally {
  69. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  70. }
  71. }
  72. finally {
  73. cleanupAfterCompletion(status);
  74. }
  75. }

进入doCommit(status)方法:

  1. // org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
  2. protected void doCommit(DefaultTransactionStatus status) {
  3. // 获取到事务
  4. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
  5. // 然后拿到数据库连接对象
  6. Connection con = txObject.getConnectionHolder().getConnection();
  7. if (status.isDebug()) {
  8. logger.debug("Committing JDBC transaction on Connection [" + con + "]");
  9. }
  10. try {
  11. // 通过Connection执行事务提交
  12. con.commit();
  13. }
  14. catch (SQLException ex) {
  15. throw new TransactionSystemException("Could not commit JDBC transaction", ex);
  16. }
  17. }

从源码中可以看到,Spring事务提交最底层其实还是依赖于数据库连接Connection的commit()方法进行提交的。

六、completeTransactionAfterThrowing(txInfo, ex)异常回滚事务

为了模拟事务操作发生异常,需要修改业务层代码:

重新调试程序,当发生异常时,会进入completeTransactionAfterThrowing(txInfo, ex)方法:

  1. // org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
  2. protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
  3. // 首先判断当前是否存在事务
  4. if (txInfo != null && txInfo.getTransactionStatus() != null) {
  5. if (logger.isTraceEnabled()) {
  6. logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
  7. "] after exception: " + ex);
  8. }
  9. // 只能针对RuntimeException或者Error类型的异常进行回滚
  10. if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
  11. try {
  12. // 根据TransactionStatus的信息回滚
  13. txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
  14. } catch (TransactionSystemException ex2) {
  15. logger.error("Application exception overridden by rollback exception", ex);
  16. ex2.initApplicationException(ex);
  17. throw ex2;
  18. }
  19. catch (RuntimeException | Error ex2) {
  20. logger.error("Application exception overridden by rollback exception", ex);
  21. throw ex2;
  22. }
  23. }
  24. else {
  25. // We don't roll back on this exception.
  26. // Will still roll back if TransactionStatus.isRollbackOnly() is true.
  27. try {
  28. // 如果不满足回滚的条件,即使抛出异常也不会回滚,同样会进行提交
  29. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  30. }
  31. catch (TransactionSystemException ex2) {
  32. logger.error("Application exception overridden by commit exception", ex);
  33. ex2.initApplicationException(ex);
  34. throw ex2;
  35. }
  36. catch (RuntimeException | Error ex2) {
  37. logger.error("Application exception overridden by commit exception", ex);
  38. throw ex2;
  39. }
  40. }
  41. }
  42. }

completeTransactionAfterThrowing()方法首先会判断当前是否存在事务,然后通过rollbackOn()方法判断当前程序抛出的异常是否满足回滚的条件,Spring只能针对RuntimeException或者Error类型的异常进行回滚,如下所示:

  1. // org.springframework.transaction.interceptor.DefaultTransactionAttribute#rollbackOn
  2. public boolean rollbackOn(Throwable ex) {
  3. return (ex instanceof RuntimeException || ex instanceof Error);
  4. }
  • 如果满足回滚事务的条件,则需要进行事务回滚。

将会执行rollback()方法:

  1. // org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback
  2. public final void rollback(TransactionStatus status) throws TransactionException {
  3. // 如果事务已经提交成,则再次回滚会抛出异常
  4. if (status.isCompleted()) {
  5. throw new IllegalTransactionStateException(
  6. "Transaction is already completed - do not call commit or rollback more than once per transaction");
  7. }
  8. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  9. // 执行回滚操作
  10. processRollback(defStatus, false);
  11. }

进入processRollback()方法:

  1. private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  2. try {
  3. boolean unexpectedRollback = unexpected;
  4. try {
  5. // 触发beforeCompletion回调
  6. triggerBeforeCompletion(status);
  7. if (status.hasSavepoint()) {
  8. if (status.isDebug()) {
  9. logger.debug("Rolling back transaction to savepoint");
  10. }
  11. // 如果存在保存点的话,也就是当前事务为单独的线程,则回退到保存点。根据底层数据库连接进行的
  12. status.rollbackToHeldSavepoint();
  13. }
  14. else if (status.isNewTransaction()) {
  15. if (status.isDebug()) {
  16. logger.debug("Initiating transaction rollback");
  17. }
  18. // 如果当前事务为独立的新事务,则直接回退
  19. // java.sql.Connection.rollback()
  20. doRollback(status);
  21. }
  22. else {
  23. // Participating in larger transaction
  24. if (status.hasTransaction()) {
  25. if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
  26. if (status.isDebug()) {
  27. logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
  28. }
  29. // 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚
  30. doSetRollbackOnly(status);
  31. }
  32. else {
  33. if (status.isDebug()) {
  34. logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
  35. }
  36. }
  37. }
  38. else {
  39. logger.debug("Should roll back transaction but cannot - no transaction available");
  40. }
  41. // Unexpected rollback only matters here if we're asked to fail early
  42. if (!isFailEarlyOnGlobalRollbackOnly()) {
  43. unexpectedRollback = false;
  44. }
  45. }
  46. }
  47. catch (RuntimeException | Error ex) {
  48. triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
  49. throw ex;
  50. }
  51. // 触发afterCompletion()方法的回调
  52. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  53. // Raise UnexpectedRollbackException if we had a global rollback-only marker
  54. if (unexpectedRollback) {
  55. throw new UnexpectedRollbackException(
  56. "Transaction rolled back because it has been marked as rollback-only");
  57. }
  58. }
  59. finally {
  60. // 清除记录的资源并将挂起的资源恢复
  61. cleanupAfterCompletion(status);
  62. }
  63. }

 真正执行回滚是在doRollback(status)方法:

  1. protected void doRollback(DefaultTransactionStatus status) {
  2. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
  3. // 获取数据库连接对象
  4. Connection con = txObject.getConnectionHolder().getConnection();
  5. if (status.isDebug()) {
  6. logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
  7. }
  8. try {
  9. // 通过Connection进行回滚
  10. con.rollback();
  11. }
  12. catch (SQLException ex) {
  13. throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
  14. }
  15. }

 跟事务提交一样,底层借助数据库连接对象Connection的rollback()方法实现事务的回滚。

  • 如果不满足回滚事务的条件,则还是会执行事务提交,执行commit(),跟前面的提交事务流程一致,还是利用connection对象进行事务提交。

前面我们使用的是默认的传播行为进行介绍,当然还有其它传播行为,读者朋友可以扩展一下,各种传播行为,Spring内部是如何处理的。

七、总结

通过前面的分析,我们知道了Spring声明式事务的原理是通过AOP动态拦截事务方法的执行,然后通过环绕通知的方式进行增强,在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务。如果事务方法执行未发生异常,将会执行事务提交;如果事务方法执行发生了异常,将会执行事务回滚。注意Spring只能针对RuntimeException或者Error类型的异常进行回滚,并且底层是利用数据库连接Connection对象来执行事务提交或者事务回滚的。

以上就是关于事务代理调用过程的分析,由于笔者水平有限,如文章中有不对之处,还请大家指正,一起学习,一起进步!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/225420
推荐阅读
相关标签
  

闽ICP备14008679号