赞
踩
Spring 事务分为两种,声明式和编程式, 声明式就是 在 方法或者接口上加 @Transactional的注解, 这样就可以交给Spring 管理它的提交,回滚,等等, 编程式就是用spring提供的模板 ,通过回调方法实现. 不管使用哪一种,最后事务的执行入口都是TransactionInterceptor的invoke方法,这个在上一章Spring源码解析之-- 事务注解 处理流程 分析中有介绍的. 下面就从这个入口,开始分析如何开启事务
如下图,首先看一下事务的大致流程如下:
接下来再分析源码
invoke 方法就是一个入口, 主要就t是先获取对应的 targetClass, 然后适配类 TransactionAspectSupport 的invokeWithinTransaction 方法
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
/**
首先这里是需要获取 targetClass, 如果是代理类(如 CglibProxy或者 jdkProyx),那就 获取对应的 target class
**/
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
/**适配 类 TransactionAspectSupport 的invokeWithinTransaction 方法
这里是传入了一个回调函数,也就是具体的处理逻辑
**/
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
invokeWithinTransaction 这里主要做了以下几件事情:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 获取事务属性,如果 transaction attribute 为null, 那这个方法就是 非事务类型 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 根据事务属性确定对应的事务 final TransactionManager tm = determineTransactionManager(txAttr); // 这里是反应式事务,暂时不分析 if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException( "Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } return new ReactiveTransactionSupport(adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); // 获取方法唯一标识,这里的 descriptor 就是在 获取 事务属性txAttr时 设置进去的. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); /** 这里是 申明式事务 我们一般申明式 事务定义的是DataSourceTransactionManager 就不是CallbackPreferringPlatformTransactionManager,CallbackPreferringPlatformTransactionManager 是通过回调方法 实现事务的 **/ if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // 创建 TransactionInfo TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 执行被争强方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 清除信息 cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 提交 commitTransactionAfterReturning(txInfo); return retVal; } // 编程式事务 else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }
getTransactionAttribute 方法比较简单,就是获取事务属性并缓存, 如果事务存在,那就对事务 加一个属性描述,对应的值为 类名.方法名
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 如果是Object.class 类,直接返回 if (method.getDeclaringClass() == Object.class) { return null; } /** 首先 去查看是否有缓存,getCacheKey 里面 就是一个单纯的 new 了一个对象 MethodClassKey,MethodClassKey重写了 toString() 方法, 主要就是 用了 method+targetClass **/ Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); // 如果不为null, 要么就是具体的事务属性,要么就是 默认的空属性, 如果是默认的空事务属性,那就 返回null if (cached != null) { if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { // 如果是空 ,那就再次获取一遍 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // 放入缓存, 如果 属性为null, 设置 对应的value 为NULL_TRANSACTION_ATTRIBUTE if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { // 放入属性之前, 对 事务属性 设置一个 描述,就是用 类名.方法名 String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isTraceEnabled()) { logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } }
computeTransactionAttribute 方法主要是 寻找事务属性的, 主要流程如下:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { /** 这里要求 事务的方法是 public的,不然不生效 注解式事务里面 AnnotationTransactionAttributeSource 里面默认是 true 的 **/ if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } /** 这里的方法可能是在一个接口上,所以这里是获取target class 的具体方法 比如: method 为IFoo.bar() , targetClass 是 DefaultFoo ,这里是需要获取 DefaultFoo.bar()方法 同时这里还处理了可能出现桥接的问题 如果 targetClass 为null ,那方法就不会变 **/ Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); /** 第一次尝试: 先对 方法上 @Transactional 进行解析 解析 方法上的@Transactional属性,利用 springTransactionAnnotationParser 对 方法上的 @Transactional 进行解析里面的属性, 返回 TransactionAttribute **/ TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } /** 第二次: 如果上一步没有获取到,那就可能在 target class上面 对 target class 进行解析, 如果找到对应的事务属性, 并且是 用户级别的方法,那就返回 **/ txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } /** 第三步 如果 在目标类上的 方法和类上都没有找到 对应的 事务属性 那就去 原始的接口上去寻找, 还是 先从 method 上面开始, 如果没有,那就看接口上有没有 配置事务属性 **/ if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } // 最后都没有找到,那就返回null return null; }
determineTransactionManager 方法的逻辑也比较简单:
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { // 如果 事务属性 txAttr 为null, 那就直接返回 if (txAttr == null || this.beanFactory == null) { return getTransactionManager(); } // 获取对应的事务注解上的名字 String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { // 根据指定的事务注解name去查询 return determineQualifiedTransactionManager(this.beanFactory, qualifier); } //根据默认的事务注解name查询 else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { // 获取默认的 事务管理器 TransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; } } /** 这里从(cache->beanFactory)里面获取 指定名字的事务管理器 **/ private TransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { // 这里首先也是先从缓存里面获取 TransactionManager txManager = this.transactionManagerCache.get(qualifier); //如果缓存不存在,那就从BeanFactory 里面获取,然后再放进缓存 if (txManager == null) { txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( beanFactory, TransactionManager.class, qualifier); this.transactionManagerCache.putIfAbsent(qualifier, txManager); } return txManager; }
createTransactionIfNecessary 这个方法主要是判断是否有必要创建事务 ,
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { /** 如果没有指定名称,则将方法标识应用为事务名称,就是之前设置的 class.method **/ if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
getTransaction方法 比较核心了, 主要做了以下几件事:
1.获取事务
2. 如果当前线程存在事务,则转向嵌套事务处理
3. 事务的超时设置验证
4. 事务的传播属性验证
5. 隔离级别, timeout ,connectinHolder 等配置
6. 绑定到当前线程
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 如果传入的definition 为null , 就从新定义一个新的definition (StaticTransactionDefinition 类型) TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); //这里时获取 当前线程缓存在 threadlocal里面的 connection, 如果没有connection,那DataSourceTransactionObject 里面的 //connectionHolder 为null Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 这里时判断如果上面的 connectionHolder 不为空,并且是有效的 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // 这里是 为已经存在的 transaction创建一个 TransactionStatus对象 return handleExistingTransaction(def, transaction, debugEnabled); } // Check definition settings for new transaction. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } //如果事务定义为MANDATORY,那就是必须在一个已有事务里面运行, 这里没有事务就会报错 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 这块空挂起,不做任何操作 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 这里就是开启事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
doGetTransaction 方法主要就是为了获取当前线程的 connection, 通过查看缓存在 TransactionSynchronizationManager.class里面的resources 的threadLocal 里面的connection,如果没有就为null
后面doBegin 方法里面最好是绑定数据源,就是存放在一个 ThreadLocal里面, 里面存放的是Map,key 就是 datasource,Value 就是对应的connection
protected Object doGetTransaction() {
// 创建一个 DataSourceTransactionObject 对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 设置 此事务中是否允许保存点, isNestedTransactionAllowed() 为true,是在事务创建时设置的
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/**获取当前线程的connection 缓存,如果不存在,那就为null
这里是一个Map 存储的, key 是datasource ,value 是connection
**/
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
suspend 方法主要是对事务进行挂起,对于挂起操作主要的目的是记录原有事务的状态,以便后续操作对原有事务的恢复.
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // 如果当前线程的事务同步器是否是活跃状态 if (TransactionSynchronizationManager.isSynchronizationActive()) { /** 这里获取所有事务同步器的 快照,先悬挂住,并把当前线程清空,最后返回一个 new SuspendedResourcesHolder 对象 **/ List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
startTransaction方法 主要就是 开启新事务 ,详细逻辑在下面的doBegin
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 这里主要是构造transaction, 包括设置ConnectionHolder ,隔离级别,timeout ,如果是新连接,绑定到当前线程
doBegin(transaction, definition);
// 新同步事务的设置
prepareSynchronization(status, definition);
return status;
}
doBegin 主要是构造transaction, 包括设置ConnectionHolder ,隔离级别,timeout ,这类不是spring 完成,而是交给底层的数据连接去做的,如果是新连接,绑定到当前线程
protected void doBegin(Object transaction, TransactionDefinition definition) { // 转换为 DataSourceTransactionObject DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 如果没有数据连接connection if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 这里就是调用数据源进行获取 connection 并进行绑定 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 对connection 设置 隔离级别和 是否只读属性 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). /** 这里就是关闭自动提交, 就是开启事务了,由Spring 控制提交 **/ if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } /** 设置只读事务,就是这事务内没有新增,修改,删除操作只有查询操作,不需要数据库锁等操作,减少数据库压力, 还有就是其他事务提交的数据,在"SET TRANSACTION READ ONLY" 是看不到的 **/ prepareTransactionalConnection(con, definition); // 配置 判断当前线程是否有事务的 标志 txObject.getConnectionHolder().setTransactionActive(true); // 设置timeout int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. // 绑定这connection 到当前线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
当已经建立事务连接并完成了事务的提取后,我们需要将所有的事务信息统一记录在TransactionInfo 类型的实例里面,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring 会通过TransactionInfo 类型的实例中的信息来进行回滚等后续工作.
方法prepareTransactionInfo 主要做了以下几件事:
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo; }
本章主要介绍了 创建事务相关的源码解析, 下一章介绍 事务的提交、回滚、异常处理等.
支付宝 | 微信 |
---|---|
如果有帮助记得打赏哦 | 特别需要您的打赏哦 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。