赞
踩
本文所用的Spring相关版本:
Spring事务有两种方式:
@Transactional
注解的类生成一个代理类对象,注入使用这个bean就是使用代理类对象。ThreadLocal
里,通过取ThreadLocal
的事务信息实现方法嵌套调用中的事务信息传递。以下这张流程图来自Spring官网:
Spring事务处理有以下几个重要组件:
代理对象:假设SimpleServiceA是原始类,那么在Spring初始化bean时会进行增强生成代理类SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492
的实例,代理对象对目标方法进行功能拦截器,自动完成事务的开启、提交和回滚。
事务拦截器 TransactionInterceptor
:事务拦截器将事务管理逻辑与业务代码进行解耦,作用类似于Spring AOP的一个拦截器,用于在方法调用前后实现事务管理。
事务管理器 PlatformTransactionManager
: Spring事务管理器负责管理事务的生命周期,包括事务的开始、提交或回滚等操作,其中最常用的事务管理器实现类是DataSourceTransactionManager,DataSourceTransactionManager除了上述的功能,还提供了方法使用数据源去获取和返还数据连接。
事务同步管理器 TransactionSynchronizationManager
:Spring 框架中用于管理事务同步的工具类,将事务信息和状态和线程绑定。
Spring事务组件关系图:
Spring事务在处理流程中有一系列的信息和状态类,让人眼花缭乱,下面来列举这些类,理清它们的关系。
txInfo
,内含TransactionAttribute(事务定义)、TransactionStatus(事务状态)和事务管理器。status
,实现类为DefaultTransactionStatus
,内含事务对象DataSourceTransactionObject
、savepoint
保存点和挂起的数据库连接资源suspendedResources
。txObject
或transaction
,内含数据库连接ConnectionHolder
。方法嵌套调用的情况下,每个事务拦截器进来后都会创建方法自身的TransactionInfo、TransactionStatus、DataSourceTransactionObject。
事务有关的信息和状态类关系图:
在事务处理中事务的定义类TransactionDefinition有很多衍生类和接口,它们的属性从事务注解@Transactional解析而来,在原码里名称为txAttr
,关系如下:
其中DelegatingTransactionAttribute的类图:
图上那一系列类和接口,虽然比较绕,但都是和事务的定义有关。
事务对象跟踪和管理与事务相关的状态和资源,包含数据库连接connectionHolder
,实现类为DataSourceTransactionObject
,在源码里简称为txObject
或transaction
,它的类图如下:
事务管理器 PlatformTransactionManager 是Spring负责管理事务的生命周期的事务管理器,包括事务的开始、提交或回滚等操作,通过将事务信息和数据库连接绑定至线程实现多个嵌套方法间的传播。
事务管理器内部存放着数据源DataSource,能通过数据源获取和返还数据库连接,对事务的处理就是通过数据源的连接连接进行的。
事务管理器实现类有 DataSourceTransactionManager、JtaTransactionManager、WebSphereUowTransactionManager等,此处分析的是最常用的实现类DataSourceTransactionManager。
事务管理器类图:
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface Transactional { @AliasFor("transactionManager") String value() default ""; @AliasFor("value") String transactionManager() default ""; Propagation propagation() default Propagation.REQUIRED; Isolation isolation() default Isolation.DEFAULT; int timeout() default TransactionDefinition.TIMEOUT_DEFAULT; boolean readOnly() default false; Class<? extends Throwable>[] rollbackFor() default {}; String[] rollbackForClassName() default {}; Class<? extends Throwable>[] noRollbackFor() default {}; String[] noRollbackForClassName() default {}; }
@Target({ElementType.METHOD ElementType.TYPE})
: 指定此注解可以应用的位置,可以是方法或类型(类和接口)。
@Retention(RetentionPolicy.RUNTIME)
: 表示此注解应在运行时保留,允许在运行时反射检查它。
注解属性:
transactionManager
的别名,和transactionManager
的作用一样。传播方式 | 备注 |
---|---|
REQUIRED | 当前有事务时沿用现有事务,不存在的时候新建一个事务,是默认传播方式。 |
SUPPORTS | 当前有事务时沿用现有事务,没有的时候不使用事务。 |
MANDATORY | 当前有事务时沿用现有事务,不存在时抛出异常。 |
REQUIRES_NEW | 创建新事务,若存在原有事务则挂起原事务。 |
NOT_SUPPORTED | 不使用事务,若存在原有事务则挂起原事务。 |
NEVER | 不使用事务,若存在原有事务则抛出异常。 |
NESTED | 开始一个 “嵌套的” 事务, 它是已经存在事务的一个真正的子事务, 嵌套事务开始执行时, 它将取得一个 savepoint。 如果这个嵌套事务失败, 我们将回滚到此 savepoint。嵌套事务是外部事务的一部分, 只有外部事务结束后它才会被提交。 |
隔离级别 | 中文名称 | 备注 |
---|---|---|
READ_UNCOMMITTED | 读未提交 | 最低的隔离级别,一个事务可以读取另一个事务未提交的数据。可能会导致脏读、幻读或不可重复读。 |
READ_COMMITTED | 读已提交 | 一个事务只能读取已经提交的数据,但是在同一事务中,多次读取同一数据可能会得到不同的结果。 |
REPEATABLE_READ | 可重复读 | 一个事务在同一数据上进行多次读取时,可以得到相同的结果,但是在同一事务中,其他事务插入的数据对该事务不可见。 |
SERIALIZABLE | 串行化 | 最高的隔离级别,所有事务串行执行,保证数据的一致性和完整性。 |
Spring实例化bean时生成事务增强的代理类,如下图的例子,SimpleController里的@Autowired SimpleServiceA simpleServiceA
注入bean不是原始的类实例,而是一个Spring实例化bean时使用Cglib生成代理类,本例里是
SimpleServiceA$$EnhancerBySpringCGLIB$$1577407e
。这个代理类继承自SimpleServiceA
,它的作用是向类增加功能拦截器进行链式处理,对事务注解来说就是添加一个TransactionInterceptor
增加事务的处理。
这个代理类编译或打包后是搜索不到的,在Spring启动后动态生成,在Spring应用启动运行时可通过arthas或jad等工具反编译得到。
从这个类的代码可以看到代理类继承原类SimpleServiceA
,代理类的updateA()
对原始updateA()
方法进行拦截。
代理类代码:
public class SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492 extends SimpleServiceA implements SpringProxy, Advised, Factory { private boolean CGLIB$BOUND; public static Object CGLIB$FACTORY_DATA; private static final ThreadLocal CGLIB$THREAD_CALLBACKS; private static final Callback[] CGLIB$STATIC_CALLBACKS; private MethodInterceptor CGLIB$CALLBACK_0; private MethodInterceptor CGLIB$CALLBACK_1; private NoOp CGLIB$CALLBACK_2; private Dispatcher CGLIB$CALLBACK_3; private Dispatcher CGLIB$CALLBACK_4; private MethodInterceptor CGLIB$CALLBACK_5; private MethodInterceptor CGLIB$CALLBACK_6; private static Object CGLIB$CALLBACK_FILTER; private static final Method CGLIB$updateA$0$Method; private static final MethodProxy CGLIB$updateA$0$Proxy; private static final Object[] CGLIB$emptyArgs; private static final Method CGLIB$equals$1$Method; private static final MethodProxy CGLIB$equals$1$Proxy; private static final Method CGLIB$toString$2$Method; private static final MethodProxy CGLIB$toString$2$Proxy; private static final Method CGLIB$hashCode$3$Method; private static final MethodProxy CGLIB$hashCode$3$Proxy; private static final Method CGLIB$clone$4$Method; private static final MethodProxy CGLIB$clone$4$Proxy; static { SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492.CGLIB$STATICHOOK144(); SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492.CGLIB$STATICHOOK143(); } static void CGLIB$STATICHOOK144() { try { return; } catch (Error | RuntimeException throwable) { throw throwable; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } static void CGLIB$STATICHOOK143() { CGLIB$THREAD_CALLBACKS = new ThreadLocal(); CGLIB$emptyArgs = new Object[0]; Class<?> clazz = Class.forName("cn.massivestars.service.impl.SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492"); Class<?> clazz2 = Class.forName("java.lang.Object"); Method[] methodArray = ReflectUtils.findMethods(new String[]{"equals", "(Ljava/lang/Object;)Z", "toString", "()Ljava/lang/String;", "hashCode", "()I", "clone", "()Ljava/lang/Object;"}, clazz2.getDeclaredMethods()); CGLIB$equals$1$Method = methodArray[0]; CGLIB$equals$1$Proxy = MethodProxy.create(clazz2, clazz, "(Ljava/lang/Object;)Z", "equals", "CGLIB$equals$1"); CGLIB$toString$2$Method = methodArray[1]; CGLIB$toString$2$Proxy = MethodProxy.create(clazz2, clazz, "()Ljava/lang/String;", "toString", "CGLIB$toString$2"); CGLIB$hashCode$3$Method = methodArray[2]; CGLIB$hashCode$3$Proxy = MethodProxy.create(clazz2, clazz, "()I", "hashCode", "CGLIB$hashCode$3"); CGLIB$clone$4$Method = methodArray[3]; CGLIB$clone$4$Proxy = MethodProxy.create(clazz2, clazz, "()Ljava/lang/Object;", "clone", "CGLIB$clone$4"); clazz2 = Class.forName("cn.massivestars.service.impl.SimpleServiceA"); //原始方法 CGLIB$updateA$0$Method = ReflectUtils.findMethods(new String[]{"updateA", "()V"}, clazz2.getDeclaredMethods())[0]; //代理方法 CGLIB$updateA$0$Proxy = MethodProxy.create(clazz2, clazz, "()V", "updateA", "CGLIB$updateA$0"); } } //代理方法和原始方法不一样,作了功能增强 public final void updateA() throws Exception { try { MethodInterceptor methodInterceptor = this.CGLIB$CALLBACK_0; if (methodInterceptor == null) { SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492.CGLIB$BIND_CALLBACKS(this); methodInterceptor = this.CGLIB$CALLBACK_0; } if (methodInterceptor != null) { //使用代理方法对原始方法updateA()进行了功能增强 Object object = methodInterceptor.intercept(this, CGLIB$updateA$0$Method, CGLIB$emptyArgs, CGLIB$updateA$0$Proxy); return; } //没有功能增强时使用原始方法 super.updateA(); return; } catch (Error | Exception | RuntimeException throwable) { throw throwable; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } //省略代码…… }
Spring在初始化bean实例的时候过检查一系列条件来确定是否需要对 Bean 进行代理,比如对有@Transactional注解的bean生成额外的增强的代理对象。由于篇幅问题下面的代码分析忽略Spring对一般bean初始化的过程,从生成bean的代理对象开始,即AbstractAutoProxyCreator#postProcessAfterInitialization()方法开始分析,可以跟着这个调用栈跟踪代理对象的生成。
spring bean功能自动代理部分:
public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean != null) { Object cacheKey = getCacheKey(bean.getClass(), beanName); if (!this.earlyProxyReferences.contains(cacheKey)) { //开始包裹成代理类 return wrapIfNecessary(bean, beanName, cacheKey); } } return bean; } protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { if (beanName != null && this.targetSourcedBeans.contains(beanName)) { return bean; } if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) { return bean; } if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) { this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } //1、查找需要生成的功能增强 Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null); if (specificInterceptors != DO_NOT_PROXY) { this.advisedBeans.put(cacheKey, Boolean.TRUE); //2、创建代理 Object proxy = createProxy( bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean)); this.proxyTypes.put(cacheKey, proxy.getClass()); return proxy; } this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } protected Object createProxy( Class<?> beanClass, String beanName, Object[] specificInterceptors, TargetSource targetSource) { if (this.beanFactory instanceof ConfigurableListableBeanFactory) { AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass); } //3、创建类代理工厂 ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom(this); if (!proxyFactory.isProxyTargetClass()) { if (shouldProxyTargetClass(beanClass, beanName)) { proxyFactory.setProxyTargetClass(true); } else { evaluateProxyInterfaces(beanClass, proxyFactory); } } //4、构建增强 Advisor[] advisors = buildAdvisors(beanName, specificInterceptors); for (Advisor advisor : advisors) { proxyFactory.addAdvisor(advisor); } proxyFactory.setTargetSource(targetSource); customizeProxyFactory(proxyFactory); proxyFactory.setFrozen(this.freezeProxy); if (advisorsPreFiltered()) { proxyFactory.setPreFiltered(true); } //5、使用代理工厂创建代理 return proxyFactory.getProxy(getProxyClassLoader()); } }
代理工厂部分:
public class ProxyFactory extends ProxyCreatorSupport { public Object getProxy(ClassLoader classLoader) { //6、创建aop代理 //7、返回代理对象 return createAopProxy().getProxy(classLoader); } protected final synchronized AopProxy createAopProxy() { if (!this.active) { activate(); } //6.1 使用aop工厂创建aop代理 return getAopProxyFactory().createAopProxy(this); } } public class DefaultAopProxyFactory implements AopProxyFactory, Serializable { //6.2 创建aop代理 public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException { if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) { Class<?> targetClass = config.getTargetClass(); if (targetClass == null) { throw new AopConfigException("TargetSource cannot determine target class: " + "Either an interface or a target is required for proxy creation."); } if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) { return new JdkDynamicAopProxy(config); } //创建cglib aop代理对象 return new ObjenesisCglibAopProxy(config); } else { return new JdkDynamicAopProxy(config); } } }
cglib代理对象部分:
class ObjenesisCglibAopProxy extends CglibAopProxy { public ObjenesisCglibAopProxy(AdvisedSupport config) { super(config); } //7.1 返回增强的代理对象 public Object getProxy(ClassLoader classLoader) { if (logger.isDebugEnabled()) { logger.debug("Creating CGLIB proxy: target source is " + this.advised.getTargetSource()); } try { Class<?> rootClass = this.advised.getTargetClass(); Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy"); //省略代码…… //创建增强器 Enhancer enhancer = createEnhancer(); if (classLoader != null) { enhancer.setClassLoader(classLoader); if (classLoader instanceof SmartClassLoader && ((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) { enhancer.setUseCache(false); } } //配置增强器 enhancer.setSuperclass(proxySuperClass); enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised)); enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE); enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader)); Callback[] callbacks = getCallbacks(rootClass); Class<?>[] types = new Class<?>[callbacks.length]; for (int x = 0; x < types.length; x++) { types[x] = callbacks[x].getClass(); } // fixedInterceptorMap only populated at this point, after getCallbacks call above enhancer.setCallbackFilter(new ProxyCallbackFilter( this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset)); enhancer.setCallbackTypes(types); //创建代理类并实例化 return createProxyClassAndInstance(enhancer, callbacks); } //省略代码... } protected Object createProxyClassAndInstance(Enhancer enhancer, Callback[] callbacks) { Class<?> proxyClass = enhancer.createClass(); Object proxyInstance = null; if (objenesis.isWorthTrying()) { try { //创建对象,objenesis创建对象时可以不调用类的构造函数 proxyInstance = objenesis.newInstance(proxyClass, enhancer.getUseCache()); } catch (Throwable ex) { logger.debug("Unable to instantiate proxy using Objenesis, " + "falling back to regular proxy construction", ex); } } //省略代码…… ((Factory) proxyInstance).setCallbacks(callbacks); return proxyInstance; } }
由于篇幅问题,对spring aop生成代理对象的内容这里只作简略介绍。
Spring的bean可以有多个功能增强,@Transactional
事务处理是其中一种,还有@Cacheable
缓存、@Async
异步执行和@Retryable
重试等;处理拥有多个功能增强的代理类,Spring使用了一种叫责任链的设计模式。
下面是有@Transactional
、@Cacheable
和@Retryable
三个功能增强的责任链:
下图为责任链链式处理的调用调用栈:
从代理对象开始是如何执行到事务拦截器的呢?从上面的代理类代码可以看到调用代理类的方法时,在有功能增强时实际调用的是MethodInterceptor
的intercept(..)
方法,MethodInterceptor
实现类是DynamicAdvisedInterceptor
,在intercept(..)
里,获取功能增强责任链,如果责任链不为空,创建CglibMethodInvocation
对象进行处理责任链。
从代理类到业务方法的时序图:
代理对象到业务方法过程源码:
public class SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492 extends SimpleServiceA implements SpringProxy , Advised, Factory { //代理方法和原始方法不一样,作了功能增强 public final void updateA() throws Exception { try { MethodInterceptor methodInterceptor = this.CGLIB$CALLBACK_0; if (methodInterceptor == null) { SimpleServiceA$$EnhancerBySpringCGLIB$$22b9f492.CGLIB$BIND_CALLBACKS(this); methodInterceptor = this.CGLIB$CALLBACK_0; } if (methodInterceptor != null) { //1、调用方法拦截器 Object object = methodInterceptor.intercept(this, CGLIB$updateA$0$Method, CGLIB$emptyArgs, CGLIB$updateA$0$Proxy); return; } super.updateA(); //没有功能增强时使用原始方法 return; } catch (Error | Exception | RuntimeException throwable) { throw throwable; } //省略代码…… } } private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable { private final AdvisedSupport advised; public DynamicAdvisedInterceptor(AdvisedSupport advised) { this.advised = advised; } @Override public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object oldProxy = null; boolean setProxyContext = false; Class<?> targetClass = null; Object target = null; try { if (this.advised.exposeProxy) { oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } target = getTarget(); if (target != null) { targetClass = target.getClass(); } //2、获取功能增强责任链 List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); Object retVal; if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) { //责任链为空 Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = methodProxy.invoke(target, argsToUse); } else { //3、责任链不为空,作责任链作链式处理 retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(); } retVal = processReturnType(proxy, target, method, retVal); return retVal; } finally { if (target != null) { releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } } //省略代码…… } } private static class CglibMethodInvocation extends ReflectiveMethodInvocation { //4、处理责任链 public Object proceed() throws Throwable { if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { //如果责任链已处理完成,调用业务方法 return invokeJoinpoint(); } //处理责任链的下一个功能增强 Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { return proceed(); } } else { //5、调用具体的拦截器 return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } } } public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor { //6、调用事务拦截器 public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); //6.1 调用事务核心方法 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); } //7、事务核心方法 //这个方法继承自TransactionAspectSupport,代码在TransactionAspectSupport protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //省略代码…… if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //如果必要,创建事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { //责任链的下一个拦截处理器,在最后一个拦截器处理完后就是业务代码 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //出现异常回滚事务 completeTransactionAfterThrowing(txInfo, ex); throw ex; } //省略代码…… //提交事务 commitTransactionAfterReturning(txInfo); return retVal; } //省略代码…… } }
下面是一个有@Cacheable、@Retryable和@Transactional注解的调用栈,很清晰的展示代理对象功能增强的调用过程:
cn.massivestars.service.SimpleServiceA.updateA(SimpleServiceA.java:33) cn.massivestars.service.SimpleServiceA$$FastClassBySpringCGLIB$$6479dd5.invoke(<generated>) org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720) org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:280) org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) org.springframework.cache.interceptor.CacheInterceptor$1.invoke(CacheInterceptor.java:52) org.springframework.cache.interceptor.CacheAspectSupport.invokeOperation(CacheAspectSupport.java:345) org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:414) org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:327) org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:61) org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:74) org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:157) org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:101) org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:118) org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655) cn.massivestars.service.SimpleServiceA$$EnhancerBySpringCGLIB$$3b7e878d.updateA(<generated>) cn.massivestars.controller.SimpleController.update(SimpleController.java:28)
事务的处理和我们自己手写的类似,使用try catch代码块包裹业务方法,在业务方法前开启事务,捕获异常时回滚事务,正常运行时最后提交事务。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { //事务核心方法 //这个方法继承自TransactionAspectSupport,代码在TransactionAspectSupport protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); //取事务管理器 final String joinpointIdentification = methodIdentification(method, targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //如果必要,创建事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { //责任链的下一个拦截处理器,在最后一个拦截器处理完后就是业务代码 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //出现异常回滚事务 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //还原为原来的事务信息txInfo cleanupTransactionInfo(txInfo); } //提交事务 commitTransactionAfterReturning(txInfo); return retVal; } //省略代码…… } }
有关的类和组件都已经介绍完毕,下面看看事务管理器处理事务的开始、提交和回滚是怎样实现。
开始事务流程如下:
存在事务,根据事务的传播方式作不同处理:
不存在事务,根据事务的传播方式作不同处理::
TransactionInfo txInfo
,将当前方法事务信息绑定到线程并保留原事务信息以便方法嵌套调用的时候可以恢复。开始事务源码(事务拦截器部分):
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor { protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { //如果事务有名称,将txAttr替换成有事务名称的DelegatingTransactionAttribute实例(相比原来的类多了事务名称) if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //如果事务管理器不为null,开启事务 status = tm.getTransaction(txAttr); } } //配制事务信息,事务信息txInfo绑定至当前线程 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { txInfo.newTransactionStatus(status); } // 即使在这里没有创建新的事务,始终将 TransactionInfo 绑定到线程。 // 这确保了即使没有由此切面创建事务,TransactionInfo 堆栈也将被正确管理。 txInfo.bindToThread(); return txInfo; } protected final class TransactionInfo { private void bindToThread() { //绑定当前方法事务信息,保留原来事务信息以便当前方法事务结束后恢复 this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } } //省略代码... }
开始事务源码(事务管理器部分):
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { //数据源 private DataSource dataSource; //此方法继承自AbstractPlatformTransactionManager,代码在AbstractPlatformTransactionManager public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //创建一个新的事务对象transaction //如果线程已存在事务,事务对象transaction使用原有的连接 Object transaction = doGetTransaction(); if (definition == null) { definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { //存在事务,根据事务的传播方式作不同处理 return handleExistingTransaction(definition, transaction, debugEnabled); } //省略代码... //以下代码为不存在事务时的处理 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { //如果事务传播方式为MANDATORY则抛出异常 throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //这里挂起一个空事务是为了写法的一致性,没有实际作用 SuspendedResourcesHolder suspendedResources = suspend(null); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //真正开始事务 doBegin(transaction, definition); //同步事务信息 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { //创建空事务对象,不真正去处理事务,只是将事务信息和状态绑定到线程 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); //将事务对象内的数据库连接设置为线程绑定的那个 return txObject; } protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //原来不存在事务,从数据源获取连接 Connection newCon = this.dataSource.getConnection(); txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //设置事务隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); //如果设置了事务自动提交,改为手动提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } txObject.getConnectionHolder().setTransactionActive(true); //设置事务超时时间 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //将数据库连接绑定到线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { //如果出现异常,归还还数据库连接, if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } } }
嵌套事务是指在一个事务内部开启了另一个事务。当一个事务内部开启了另一个事务时,外部事务会暂时挂起,直到内部事务执行完成。在源码中多次出现suspend()
方法挂起原事务,挂起事务的目的在于:
支持嵌套事务:在某些事务传播行为下,比如PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED,Spring需要支持嵌套事务。当这些传播行为发生时,当前正在执行的事务可能需要被挂起,以便开始一个新的事务。挂起原事务可以暂时中断当前事务的执行,让新的事务能够开始执行。
保存原事务的状态:挂起原事务会保存原事务的状态信息,以便在新事务执行完毕后能够正确地恢复原事务的状态。这对于事务管理的一致性和可靠性非常重要。
处理事务异常:在执行新事务的过程中,可能会发生异常。挂起原事务可以确保异常的发生不会影响到原事务的状态,保证事务的正确回滚和异常处理。
挂起事务的有关源码:
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager { //该方法继承自父类AbstractPlatformTransactionManager,代码在父类AbstractPlatformTransactionManager protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; //挂起的数据库连接持有者 if (transaction != null) { suspendedResources = doSuspend(transaction); //挂起事务 } //将线程绑定的原事务信息清除,返回原事务信息作为挂起事务信息 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { //还原事务管理器注册的TransactionSynchronization对象 doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { //还原事务管理器注册的TransactionSynchronization对象 doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } } //将事务对象的数据库连接设置为空,将线程绑定的数据库连接解绑 protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(this.dataSource); return conHolder; } }
上面介绍了事务的挂起,既然有挂起就有恢复,挂起事务是和恢复事务配套使用的;在事务的提交processCommit()
和回滚processRollback()
的finally代码块都会执行cleanupAfterCompletion()
方法,这个方法里,如果存在挂起的事务信息时,会恢复挂起的事务。
恢复事务的有关源码:
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager { implements ResourceTransactionManager, InitializingBean { private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { //恢复事务 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } } protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { //恢复旧连接,绑定至线程 doResume(transaction, suspendedResources); } //恢复事务状态和信息 List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } } public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { protected void doResume(Object transaction, Object suspendedResources) { ConnectionHolder conHolder = (ConnectionHolder) suspendedResources; //重新将原数据库连接绑定到线程 TransactionSynchronizationManager.bindResource(this.dataSource, conHolder); } }
下面来看看事务挂起和恢复的详细过程。假设有代理对象A和代理对象B:
当调用代理对象A的方法a()
时,方法a()
嵌套调用了代理对象B的方法b()
,这时候会产生事务的挂起和恢复。
挂起和恢复事务的时序图:
Spring提交通过事务管理器提交事务,通过事务对象txObject内含的数据库连接提交事务,在提交事务完成后将做下列操作:
事务提交调用栈:
源码:
//事务拦截器 public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor //该方法继承子类TransactionAspectSupport protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { //通过事务管理器提交事务 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } protected void cleanupTransactionInfo(TransactionInfo txInfo) { if (txInfo != null) { //恢复为旧的事务信息(方法嵌套调用会还原为调用栈中上一个方法的事务信息) txInfo.restoreThreadLocalStatus(); } } protected final class TransactionInfo { private void restoreThreadLocalStatus() { //恢复为旧的事务信息(方法嵌套调用会还原为调用栈中上一个方法的事务信息) transactionInfoHolder.set(this.oldTransactionInfo); } } //省略代码…… } //事务管理器 public class DataSourceTransactionManager extends AbstractPlatformTransactionManager public final void commit(TransactionStatus status) throws TransactionException { //如果事务状态为已完成,抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { processRollback(defStatus); if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus); //处理事务提交 } //该方法继承子类AbstractPlatformTransactionManager private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); //调用业务方法里注册TransactionSynchronization对象的beforeCommit()方法 triggerBeforeCommit(status); //调用业务方法里注册TransactionSynchronization对象的beforeCompletion()方法 triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } //释放在当前事务中之前定义的一个保存点。 if (status.hasSavepoint()) { status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { //如果是事务状态为新事务才提交(方法嵌套调用的话是在最外层的方法) doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { //调用业务方法里注册TransactionSynchronization对象的afterCompletion()方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { //调用业务方法里注册TransactionSynchronization对象的afterCompletion()方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { //调用业务方法里注册TransactionSynchronization对象的beforeCompletion()方法 triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { //调用业务方法里注册TransactionSynchronization对象的beforeCompletion()方法 triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } try { //调用业务方法里注册TransactionSynchronization对象的afterCommit()方法 triggerAfterCommit(status); } finally { //调用业务方法里注册TransactionSynchronization对象的afterCommit()方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } } protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); try { con.commit(); //使用数据库连接提交事务 } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } } private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); //事务状态设置为已完成 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { //如果是嵌套方法,恢复原来挂起的事务 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } } protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //解绑线程已绑定的数据库连接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); } //重置数据库连接的设置 Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { //返还数据库连接 DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); } }
在业务逻辑执行过程中出现异常时,事务拦截器会捕获异常,并根据事务注解的配置判断是否符合回滚的条件,若符合则会回滚事务。
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.hasTransaction()) { if (txInfo.transactionAttribute.rollbackOn(ex)) { //根据规则判断是否要回滚 try { //使用事务管理器回滚事务 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { throw ex2; } catch (Error err) { throw err; } } else { //使用事务管理器提交事务 try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { throw ex2; } catch (Error err) { logger.error("Application exception overridden by commit error", ex); throw err; } } } } private void processRollback(DefaultTransactionStatus status) { try { try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { //如果事务状态为新事务才回滚(比如嵌套调用的最外层方法) doRollback(status); } else if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { doSetRollbackOnly(status); } } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { cleanupAfterCompletion(status); } } protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); try { con.rollback(); //使用数据库连接回滚事务 } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } }
判断当前异常是否符合注解@Transactional设置的rollbackFor和noRollbackFor,只有符合条件的异常才回滚事务。
public class RuleBasedTransactionAttribute extends DefaultTransactionAttribute { public boolean rollbackOn(Throwable ex) { RollbackRuleAttribute winner = null; int deepest = Integer.MAX_VALUE; //是否命中回滚规则 if (this.rollbackRules != null) { for (RollbackRuleAttribute rule : this.rollbackRules) { int depth = rule.getDepth(ex); if (depth >= 0 && depth < deepest) { deepest = depth; winner = rule; } } } //如果没有命中回滚规则,使用父类的默认回滚规则 if (winner == null) { return super.rollbackOn(ex); } //不是非回滚规则 return !(winner instanceof NoRollbackRuleAttribute); } //该方法继承自DefaultTransactionAttribute,代码在DefaultTransactionAttribute public boolean rollbackOn(Throwable ex) { //默认回滚的异常类型:RuntimeException和Error return (ex instanceof RuntimeException || ex instanceof Error); } }
从源码可知默认回滚的异常类型为RuntimeException
和Error
,当出现IOException
、NoClassDefFoundException
之类的非RuntimeException
异常时,默认情况事务并不会回滚,这可能会导致和我们预期的不一致。大多数情况下,推荐事务注解配置rollbackFor的回滚异常为Exception.class。
@Transactional(rollbackFor = Exception.class)
Spring在事务提交或回滚后使用cleanupAfterCompletion()
方法在事务完成后执行一些必要的清理操作,以确保下一个事务可以从一个干净的状态开始。
//事务管理器 public class DataSourceTransactionManager extends AbstractPlatformTransactionManager private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); //事务状态设置为已完成 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { //如果是嵌套方法,恢复原来挂起的事务 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } } protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //解绑线程已绑定的数据库连接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); } //重置数据库连接的设置 Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { //返还数据库连接 DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); } }
Spring 的事务同步管理器 TransactionSynchronizationManager 是 Spring 框架中用于管理事务同步的工具类。它使用ThreadLocal存储信息,允许在事务的不同阶段注册回调,以执行特定的操作。主要作用包括以下几个方面:
事务同步回调注册: TransactionSynchronizationManager允许在事务的不同阶段注册回调方法,如在事务完成时、事务回滚时、事务提交前等。开发者可以通过该工具类注册相应的回调方法来执行特定的操作,例如在事务提交后清理资源、发送消息等。
线程绑定: TransactionSynchronizationManager是基于线程的,它允许将事务同步回调与当前线程进行绑定。这意味着只有与事务关联的线程才能触发事务同步回调,从而保证了线程安全性。
支持多个同步回调: TransactionSynchronizationManager支持注册多个事务同步回调,这些回调可以按照注册的顺序执行。这样可以方便地实现多个模块间的协作,每个模块都可以在适当的时候执行自己的逻辑。
提供事务状态管理: TransactionSynchronizationManager还提供了方法来获取当前事务的状态,如事务是否处于激活状态、是否已经完成、是否已经回滚等。这样可以帮助开发者根据当前事务的状态执行相应的操作。
与事务管理器配合使用: TransactionSynchronizationManager 通常与事务管理器(如 PlatformTransactionManager)配合使用,以确保事务同步回调的正确执行。它可以在事务的开始、提交、回滚等关键节点触发相应的回调,并在合适的时机执行事务同步逻辑。
线程绑定的信息:
public abstract class TransactionSynchronizationManager { //数据库连接持有者connectionHolder private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); //在业务方法里通过TransactionSynchronizationManager#registerSynchronization注册的同步执行方法 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations"); //业务名称 private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name"); //事务是否只读 private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status"); //业务隔离级别 private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level"); //事务是否激活 private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active"); }
绑定和解绑资源:
这里的资源是指数据库连接持有者。
public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); //如果绑定的map对象为空,初始化它 if (map == null) { map = new HashMap<Object, Object>(); resources.set(map); } Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } } public static Object unbindResource(Object key) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } private static Object doUnbindResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.remove(actualKey); if (map.isEmpty()) { resources.remove(); //清除线程绑定的数据库连接 } //连接为无效时将它设置为空 if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { value = null; } return value; }
在Spring出现事务失效最常见的是因为在同一个类内部的方法之间的调用,在上面已经分析过只有在使用代理对象调用业务方法才会有对应的功能增强,同一个类内方法间用的是this对象,不具备事务增强。
下面是一个失效例子:
@Service public class Caller { @Autowired SimpleService simpleService; public void call() { simpleService.updateA(); } } @Service public class SimpleService { public void updateA() { //这里调用updateB()等同于this.updateB(),使用的是this对象,不具备事务增强,事务失效 updateB(); } @Transactional(rollbackFor = Exception.class) public void updateB() { String sql = "update article set title = 'Today is a good day!' where id = 1"; jdbcTemplate.update(sql); } }
本文简述了Spring注解式事务@Transactional的原理,分析了关键代码,了解源码的实现能帮助我们更好的使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。