当前位置:   article > 正文

从源码角度剖析 Spring 如何管理 mybatis 事务的?_spring容器如何管理mybatis的

spring容器如何管理mybatis的

一、XMLMapperBuilder、mapperProxy 与 mapperMethod

上篇文章讲了 mapper 文件是怎么解析的,在文章开头提到了 SqlSessionFactory 这个重要的对象,是的就是我们经常需要配置的:

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
  4. // 略
  5. }

这里面做了很多自动化的配置,当然我们可以通过重写它来自定义我们自己的 sqlSessionFactory,借用一下上篇文章的图片:

spring 借助 SqlSessionFactoryBean 来创建 sqlSessionFactory,这可以视作是一个典型的建造者模式,来创建 SqlSessionFactory。

上篇文章说到,spring 拿到我们配置的 mapper 路径去扫描我们 mapper.xml 然后进行一个循环进行解析(上篇文章第二章节:二、SqlSessionFactory 的初始化与 XMLMapperBuilder):

  1. -- 代码位于 org.mybatis.spring.SqlSessionFactoryBean#buildSqlSessionFactory --
  2. if (this.mapperLocations != null) {
  3. if (this.mapperLocations.length == 0) {
  4. LOGGER.warn(() -> "Property 'mapperLocations' was specified but matching resources are not found.");
  5. } else {
  6. for (Resource mapperLocation : this.mapperLocations) {
  7. if (mapperLocation == null) {
  8. continue;
  9. }
  10. try {
  11. XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(),
  12. targetConfiguration, mapperLocation.toString(), targetConfiguration.getSqlFragments());
  13. xmlMapperBuilder.parse();
  14. } catch (Exception e) {
  15. throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e);
  16. } finally {
  17. ErrorContext.instance().reset();
  18. }
  19. LOGGER.debug(() -> "Parsed mapper file: '" + mapperLocation + "'");
  20. }
  21. }
  22. } else {
  23. LOGGER.debug(() -> "Property 'mapperLocations' was not specified.");
  24. }
  25. -- 代码位于 org.apache.ibatis.builder.xml.XMLMapperBuilder#parse --
  26. public void parse() {
  27. if (!configuration.isResourceLoaded(resource)) {
  28. configurationElement(parser.evalNode("/mapper")); // 上篇文章主要说的
  29. configuration.addLoadedResource(resource);
  30. bindMapperForNamespace();// 创建mapperProxy的工厂对象
  31. }
  32. parsePendingResultMaps();
  33. parsePendingCacheRefs();
  34. parsePendingStatements();
  35. }

1.1 从 xml 到 mapperStatement

上篇文章实际上就是在讲解 configurationElement(parser.evalNode("/mapper")); 里面发生的故事,实际上还有后续的步骤,如果对 mybatis 有所了解的,应该知道,mybatis 会为我们的接口创建一个叫做 mapperProxy 的代理对象(划重点),其实就是在这后续的步骤 bindMapperForNamespace(); 做的(不尽然,实际上是创建并绑定了 mapperProxyFactory)。

不贴太多代码,bindMapperForNamespace() 方法里核心做的主要就是调用 configuration.addMapper() 方法

  1. if (boundType != null) {
  2. if (!configuration.hasMapper(boundType)) {
  3. // Spring may not know the real resource name so we set a flag
  4. // to prevent loading again this resource from the mapper interface
  5. // look at MapperAnnotationBuilder#loadXmlResource
  6. configuration.addLoadedResource("namespace:" + namespace);
  7. configuration.addMapper(boundType);
  8. }
  9. }

这个 boundType 就是我们在 mapper 文件里面指定的 namespace,比如:

  1. <mapper namespace="com.anur.mybatisdemo.test.TrackerConfigMapper">
  2. XXXXXXXXXXXXXXXXXX 里面写的sql语句,resultMap 等等,略
  3. </mapper>

在 configuration.addMapper() 中调用了 mapperRegistry.addMapper(),看到 knowMappers ,这个就是存储我们生产 MapperProxy 的工厂映射 map,我们稍微再讲,先继续往下看。

  1. public <T> void addMapper(Class<T> type) {
  2. if (type.isInterface()) {
  3. if (hasMapper(type)) {
  4. throw new BindingException("Type " + type + " is already known to the MapperRegistry.");
  5. }
  6. boolean loadCompleted = false;
  7. try {
  8. knownMappers.put(type, new MapperProxyFactory<>(type));
  9. // It's important that the type is added before the parser is run
  10. // otherwise the binding may automatically be attempted by the
  11. // mapper parser. If the type is already known, it won't try.
  12. MapperAnnotationBuilder parser = new MapperAnnotationBuilder(config, type);
  13. parser.parse();
  14. loadCompleted = true;
  15. } finally {
  16. if (!loadCompleted) {
  17. knownMappers.remove(type);
  18. }
  19. }
  20. }
  21. }

1.2 从注解到 mapperStatement

看到 MapperAnnotationBuilder#parse(),parse() 中主要是对这个接口里面定义的方法做了 parseStatement 这件事

  1. for (Method method : methods) {
  2. try {
  3. // issue #237
  4. if (!method.isBridge()) {
  5. parseStatement(method);
  6. }
  7. } catch (IncompleteElementException e) {
  8. configuration.addIncompleteMethod(new MethodResolver(this, method));
  9. }
  10. }

parseStatement() 就是解析注解语句的地方, 如果说我们没有写 xml,将语句以注解的形式写在方法上,则会在这里进行语句解析。它和我们上篇文章讲到的解析xml很像,就是拿到一大堆属性,比如 resultMap,keyGenerator 等等,生成一个 MappedStatement 对象,这里就不赘述了。

  1. void parseStatement(Method method) {
  2. Class<?> parameterTypeClass = getParameterType(method);
  3. LanguageDriver languageDriver = getLanguageDriver(method);
  4. SqlSource sqlSource = getSqlSourceFromAnnotations(method, parameterTypeClass, languageDriver);
  5. if (sqlSource != null) {
  6. // 解析注解式的 sql 语句,略
  7. }
  8. }

1.3 如果写了 xml,也写了注解会怎么样(调皮)

我们知道承载 mapperStatement 的是一个 map 映射,通过我们在上篇文章中反复强调的 id 来作为 key,那么重复添加会出现什么呢?

答案在这里,mybatis 的这个 map 被重写了,同时写这两者的话,会抛出 ...already contains value for... 的异常

  1. -- 代码位置 org.apache.ibatis.session.Configuration.StrictMap#put --
  2. @Override
  3. @SuppressWarnings("unchecked")
  4. public V put(String key, V value) {
  5. if (containsKey(key)) {
  6. throw new IllegalArgumentException(name + " already contains value for " + key
  7. + (conflictMessageProducer == null ? "" : conflictMessageProducer.apply(super.get(key), value)));
  8. }
  9. if (key.contains(".")) {
  10. final String shortKey = getShortName(key);
  11. if (super.get(shortKey) == null) {
  12. super.put(shortKey, value);
  13. } else {
  14. super.put(shortKey, (V) new Ambiguity(shortKey));
  15. }
  16. }
  17. return super.put(key, value);
  18. }

1.4 回到 MapperProxy

1.4.1 MapperProxy 的创建

刚才在1.1中我们提到了,mapperProxy,也就是刚才 
org.apache.ibatis.binding.MapperRegistry#addMapper 的代码:knownMappers.put(type, new MapperProxyFactory<>(type));

看到 MapperProxyFactory 的内部:

  1. -- 有删减 --
  2. public class MapperProxyFactory<T> {
  3. private final Class<T> mapperInterface;
  4. private final Map<Method, MapperMethod> methodCache = new ConcurrentHashMap<>();
  5. public MapperProxyFactory(Class<T> mapperInterface) {
  6. this.mapperInterface = mapperInterface;
  7. }
  8. @SuppressWarnings("unchecked")
  9. protected T newInstance(MapperProxy<T> mapperProxy) {
  10. return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy);
  11. }
  12. public T newInstance(SqlSession sqlSession) {
  13. final MapperProxy<T> mapperProxy = new MapperProxy<>(sqlSession, mapperInterface, methodCache);
  14. return newInstance(mapperProxy);
  15. }
  16. }

了解JDK动态代理的小伙伴应该很清楚了, newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) 意为,为接口创建一个实现了 InvocationHandler 的代理对象。我们在调用接口方法的时候,实际上要看代理类是如何实现的。

那么看看 mapperProxy 的内部的 invoke 是如何实现的,这里有三类方法,

  • 一种是一些 Object 对象带来的方法,这里不进行代理,直接 invoke,
  • 一种是default方法,一种比较蛋疼的写法,把接口当抽象类写,里面可以放一个default方法写实现,这种代理了也没太大意义
  • 最后一种也就是我们准备代理的方法, 它会为每个非上面两者的方法,懒加载一个 MapperMethod 对象,并调用 MapperMethod#execute 来执行真正的 mybatis 逻辑。

1.4.2 MapperMethod 的创建

  1. -- 有删减 --
  2. public class MapperProxy<T> implements InvocationHandler, Serializable {
  3. public MapperProxy(SqlSession sqlSession, Class<T> mapperInterface, Map<Method, MapperMethod> methodCache) {
  4. this.sqlSession = sqlSession;
  5. this.mapperInterface = mapperInterface;
  6. this.methodCache = methodCache;
  7. }
  8. @Override
  9. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  10. try {
  11. if (Object.class.equals(method.getDeclaringClass())) {// 来自 Object 的方法,比如 toString()
  12. return method.invoke(this, args);
  13. } else if (method.isDefault()) {// 静态方法,我们可以直接忽略
  14. if (privateLookupInMethod == null) {
  15. return invokeDefaultMethodJava8(proxy, method, args);
  16. } else {
  17. return invokeDefaultMethodJava9(proxy, method, args);
  18. }
  19. }
  20. } catch (Throwable t) {
  21. throw ExceptionUtil.unwrapThrowable(t);
  22. }
  23. final MapperMethod mapperMethod = cachedMapperMethod(method);
  24. return mapperMethod.execute(sqlSession, args);
  25. }
  26. private MapperMethod cachedMapperMethod(Method method) {
  27. return methodCache.computeIfAbsent(method,
  28. k -> new MapperMethod(mapperInterface, method, sqlSession.getConfiguration()));
  29. }
  30. }

MapperMethod 的逻辑是怎么样的,也很好猜到,它的构造函数中创建了两个对象,

  1. public class MapperMethod {
  2. private final SqlCommand command;
  3. private final MethodSignature method;
  4. public MapperMethod(Class<?> mapperInterface, Method method, Configuration config) {
  5. this.command = new SqlCommand(config, mapperInterface, method);
  6. this.method = new MethodSignature(config, mapperInterface, method);
  7. }
  • sqlCommand

sqlCommand 实际上就是从 configuration 里面把它对应的 MappedStatement 取出来,持有它的唯一 id 和执行类型。

  1. public static class SqlCommand {
  2. private final String name;
  3. private final SqlCommandType type;
  4. public SqlCommand(Configuration configuration, Class<?> mapperInterface, Method method) {
  5. final String methodName = method.getName();
  6. final Class<?> declaringClass = method.getDeclaringClass();
  7. MappedStatement ms = resolveMappedStatement(mapperInterface, methodName, declaringClass,
  8. configuration);
  9. if (ms == null) {
  10. if (method.getAnnotation(Flush.class) != null) {
  11. name = null;
  12. type = SqlCommandType.FLUSH;
  13. } else {
  14. throw new BindingException("Invalid bound statement (not found): "
  15. + mapperInterface.getName() + "." + methodName);
  16. }
  17. } else {
  18. name = ms.getId();
  19. type = ms.getSqlCommandType();
  20. if (type == SqlCommandType.UNKNOWN) {
  21. throw new BindingException("Unknown execution method for: " + name);
  22. }
  23. }
  24. }
  • MethodSignature MethodSignature 是针对接口返回值、参数等值的解析,比如我们的 @Param 注解,就是在 new ParamNameResolver(configuration, method); 里面解析的,比较简单,在之前的文章 简单概括的mybatis sqlSession 源码解析 里也提到过,这里就不多说了。
  1. public MethodSignature(Configuration configuration, Class<?> mapperInterface, Method method) {
  2. Type resolvedReturnType = TypeParameterResolver.resolveReturnType(method, mapperInterface);
  3. if (resolvedReturnType instanceof Class<?>) {
  4. this.returnType = (Class<?>) resolvedReturnType;
  5. } else if (resolvedReturnType instanceof ParameterizedType) {
  6. this.returnType = (Class<?>) ((ParameterizedType) resolvedReturnType).getRawType();
  7. } else {
  8. this.returnType = method.getReturnType();
  9. }
  10. this.returnsVoid = void.class.equals(this.returnType);
  11. this.returnsMany = configuration.getObjectFactory().isCollection(this.returnType) || this.returnType.isArray();
  12. this.returnsCursor = Cursor.class.equals(this.returnType);
  13. this.returnsOptional = Optional.class.equals(this.returnType);
  14. this.mapKey = getMapKey(method);
  15. this.returnsMap = this.mapKey != null;
  16. this.rowBoundsIndex = getUniqueParamIndex(method, RowBounds.class);
  17. this.resultHandlerIndex = getUniqueParamIndex(method, ResultHandler.class);
  18. this.paramNameResolver = new ParamNameResolver(configuration, method);
  19. }

1.4.3 MapperMethod 的执行

mapperMethod 就是 sqlSession 与 mappedStatement 的一个整合。它的执行是一个策略模式:

  1. public Object execute(SqlSession sqlSession, Object[] args) {
  2. Object result;
  3. switch (command.getType()) {
  4. case INSERT: {
  5. Object param = method.convertArgsToSqlCommandParam(args);
  6. result = rowCountResult(sqlSession.insert(command.getName(), param));
  7. break;
  8. }
  9. case UPDATE: {
  10. Object param = method.convertArgsToSqlCommandParam(args);
  11. result = rowCountResult(sqlSession.update(command.getName(), param));
  12. break;
  13. }
  14. case DELETE: {
  15. Object param = method.convertArgsToSqlCommandParam(args);
  16. result = rowCountResult(sqlSession.delete(command.getName(), param));
  17. break;
  18. }
  19. case SELECT:
  20. // 略..
  21. }

具体是怎么执行的在文章 简单概括的mybatis sqlSession 源码解析 提到过,这里也不过多赘述。

这里对 MapperProxy 在初始化与调用过程中的关系做一下罗列:

二、 下文序言

上面的 MapperProxy 讲解的比较粗略,因为真的很简单(复杂一点的在 MepperMethod 的策略模式,也就是调用 sqlSession去执行语句的时候,但是那个本文不会详细说明,后续的文章会解析这部分代码)

本文要讲的是几个在很多文章或者书里都没有提到,或者只是简单提了一下的点:本文将会把 sqlSession、MapperProxy、Spring事务管理几个关联密切的功能点进行总结,比如如下这样的疑问:

  • 1、我们知道一个 sqlSession 对应一个数据库连接,在创建 MapperProxy 的时候,又注入了 sqlSession ,难道我们用的一直是同一个 sqlSession?或者难道每次使用不同的数据库连接,会创建不同的 MapperProxy 代理?
  • 2、事务传播等级是怎么实现的,和 sqlSession 有关系吗?
  • 3、代理对象 MapperProxy 是如何和 spring 产生关联的?

三、 SqlSession 的初始化及其运作总览

为了避免有小伙伴对 sqlSession 完全没有概念,这里将接口代码贴出,可以看出 sqlSession 是执行语句的一个入口,同时也提供了事务的一些操作,实际上就是如此:

  1. public interface SqlSession extends Closeable {
  2. <T> T selectOne(String statement);
  3. <T> T selectOne(String statement, Object parameter);
  4. <E> List<E> selectList(String statement);
  5. <E> List<E> selectList(String statement, Object parameter);
  6. <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds);
  7. <K, V> Map<K, V> selectMap(String statement, String mapKey);
  8. <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey);
  9. <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds);
  10. <T> Cursor<T> selectCursor(String statement);
  11. <T> Cursor<T> selectCursor(String statement, Object parameter);
  12. <T> Cursor<T> selectCursor(String statement, Object parameter, RowBounds rowBounds);
  13. void select(String statement, Object parameter, ResultHandler handler);
  14. void select(String statement, ResultHandler handler);
  15. void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler);
  16. int insert(String statement);
  17. int insert(String statement, Object parameter);
  18. int update(String statement);
  19. int update(String statement, Object parameter);
  20. int delete(String statement);
  21. int delete(String statement, Object parameter);
  22. void commit();
  23. void commit(boolean force);
  24. void rollback();
  25. void rollback(boolean force);
  26. List<BatchResult> flushStatements();
  27. void close();
  28. void clearCache();
  29. Configuration getConfiguration();
  30. <T> T getMapper(Class<T> type);
  31. Connection getConnection();
  32. }

3.1 sqlSession 的创建

3.1.1 Environment 与 Transaction

首先忘掉 spring 为我们提供的便利,看一下基础的,脱离了 spring 托管的 mybatis 是怎么进行 sql 操作的:

  1. SqlSession sqlSession = sqlSessionFactory.openSession();
  2. TrackerConfigMapper mapper = sqlSession.getMapper(TrackerConfigMapper.class);
  3. TrackerConfigDO one = mapper.getOne(1);

SqlSessionFactory 有两个子类实现:DefaultSqlSessionFactory 和 SqlSessionManager,SqlSessionManager 使用动态代理 + 静态代理对 DefaultSqlSessionFactory 进行了代理,不过不用太在意这个 SqlSessionManager,后面会说明原因。

上面不管怎么代理,实际逻辑的执行者都是 DefaultSqlSessionFactory,我们看看它的创建方法,也就是 openSession() 实际执行的方法:

  1. private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
  2. Transaction tx = null;
  3. try {
  4. final Environment environment = configuration.getEnvironment();
  5. final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
  6. tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
  7. final Executor executor = configuration.newExecutor(tx, execType);
  8. return new DefaultSqlSession(configuration, executor, autoCommit);
  9. } catch (Exception e) {
  10. closeTransaction(tx); // may have fetched a connection so lets call close()
  11. throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
  12. } finally {
  13. ErrorContext.instance().reset();
  14. }
  15. }

environment 可用于数据源切换,那么提到数据源切换,就很容易想到了,连接的相关信息是这货维持的。 所以看到我们的代码: tx = 
transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);, TransactionFactory 有三个实现,它们分别是 JdbcTransactionFactory、ManagedTransactionFactory 和 SpringManagedTransactionFactory。

JdbcTransactionFactory 和 ManagedTransactionFactory 最大的区别就在于 ManagedTransactionFactory 实现了空的 commit 与 rollback,源码中这样说道:付与容器来管理 transaction 的生命周期,这个博主不是特别熟悉,因为没这么用过,tomcat、jetty 等容器实现了对 jdbc 的代理。要注意,不管如何都是使用的 jdbc 这套接口规范进行数据库操作的。

  1. /**
  2. * {@link Transaction} that lets the container manage the full lifecycle of the transaction.
  3. * Delays connection retrieval until getConnection() is called.
  4. * Ignores all commit or rollback requests.
  5. * By default, it closes the connection but can be configured not to do it.
  6. *
  7. * @author Clinton Begin
  8. *
  9. * @see ManagedTransactionFactory
  10. */

Transaction 是 mybatis 创建的一个对象,它实际上是对 jdbc connection 对象的一个封装:

  1. -- 代码位于 org.apache.ibatis.transaction.jdbc.JdbcTransaction --
  2. @Override
  3. public Connection getConnection() throws SQLException {
  4. if (connection == null) {
  5. openConnection();
  6. }
  7. return connection;
  8. }
  9. @Override
  10. public void commit() throws SQLException {
  11. if (connection != null && !connection.getAutoCommit()) {
  12. if (log.isDebugEnabled()) {
  13. log.debug("Committing JDBC Connection [" + connection + "]");
  14. }
  15. connection.commit();
  16. }
  17. }
  18. @Override
  19. public void rollback() throws SQLException {
  20. if (connection != null && !connection.getAutoCommit()) {
  21. if (log.isDebugEnabled()) {
  22. log.debug("Rolling back JDBC Connection [" + connection + "]");
  23. }
  24. connection.rollback();
  25. }
  26. }

3.1.2 Executor 与 SqlSession

我们知道 sqlSession 的 四大对象之一,Executor,负责统领全局,从语句获取(从 mappedStatement),到参数拼装(parameterHandler),再到执行语句(statementHandler),最后结果集封装(resultHandler),都是它负责“指挥”的。

我们看到它使用 Transaction 进行初始化,另外的一个参数是它的类型,这里不多说,REUSE 是带语句缓存的,和普通的 SimpleExecutor 没有特别大的区别,BATCH 类型则是通过 jdbc 提供的批量提交来对网络请求进行优化。

public enum ExecutorType {  SIMPLE, REUSE, BATCH}

最后将持有 Transaction 的 Executor 置入 SqlSession ,完成一个 SqlSession 对象的创建。

可以看到,我们的确是一个SqlSession 对应一个连接(Transaction),MapperProxy 这个业务接口的动态代理对象又持有一个 SqlSession 对象,那么总不可能一直用同一个连接吧?

当然有疑问是好的,而且通过对 SqlSession 初始化过程的剖析,我们已经完善了我们对 mybatis 的认知:

接下来就是来打消这个疑问,MapperProxy 持有的 sqlSession 和 SqlSessionFactory 创建的这个有什么关系?

3.2 SqlSessionTemplate 对 sqlSession 的代理

实际上答案就在 SqlSessionTemplate,SqlSessionTemplate 是 spring 对 mybatis SqlSessionFactory 的封装,同时,它还是 SqlSession 的代理。

SqlSessionTemplate 和 mybatis 提供的 SqlSessionManager( SqlSessionFactory 的另一个实现类,也是DefaultSqlSessionFactory 的代理类,可以细想一下,业务是否共用同一个 sqlSession 还要在业务里面去传递,去控制是不是很麻烦) 是一样的思路,不过 spring 直接代理了 sqlSession:

  1. -- 代码位于 org.mybatis.spring.SqlSessionTemplate --
  2. private final SqlSessionFactory sqlSessionFactory;
  3. private final ExecutorType executorType;
  4. private final SqlSession sqlSessionProxy;
  5. private final PersistenceExceptionTranslator exceptionTranslator;
  6. /**
  7. * Constructs a Spring managed {@code SqlSession} with the given
  8. * {@code SqlSessionFactory} and {@code ExecutorType}.
  9. * A custom {@code SQLExceptionTranslator} can be provided as an
  10. * argument so any {@code PersistenceException} thrown by MyBatis
  11. * can be custom translated to a {@code RuntimeException}
  12. * The {@code SQLExceptionTranslator} can also be null and thus no
  13. * exception translation will be done and MyBatis exceptions will be
  14. * thrown
  15. *
  16. * @param sqlSessionFactory a factory of SqlSession
  17. * @param executorType an executor type on session
  18. * @param exceptionTranslator a translator of exception
  19. */
  20. public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
  21. PersistenceExceptionTranslator exceptionTranslator) {
  22. notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");
  23. notNull(executorType, "Property 'executorType' is required");
  24. this.sqlSessionFactory = sqlSessionFactory;
  25. this.executorType = executorType;
  26. this.exceptionTranslator = exceptionTranslator;
  27. this.sqlSessionProxy = (SqlSession) newProxyInstance(
  28. SqlSessionFactory.class.getClassLoader(),
  29. new Class[] { SqlSession.class },
  30. new SqlSessionInterceptor());
  31. }

还是熟悉的配方,就是 jdk 的动态代理,SqlSessionTemplate 在初始化时创建了一个 SqlSession 代理,也内置了 ExecutorType,SqlSessionFactory 等 defaultSqlSession 初始化的必要组件。

想必看到这里,已经有很多小伙伴知道这里是怎么回事了,是的,我们对 SqlSession 的操作都是经由这个代理来完成,代理的内部,实现了真正 SqlSession 的创建与销毁,回滚与提交等,我们先纵览以下它的代理实现。

3.2.1 sqlSession 常规代理流程赏析

对于这种jdk动态代理,我们看到 SqlSessionInterceptor#invoke 方法就明了了。我们先过一遍常规的流程,也就是没有使用 spring 事务功能支持,执行完 sql 就直接提交事务的常规操作:

  • 1、getSqlSession() 创建 sqlSession
  • 2、执行 MapperProxy,也就是前面讲了一大堆的,MapperProxy 中,通过 MapperMethod 来调用 sqlSession 和我们生成好的 mappedStatement 操作 sql 语句。
  • 3、提交事务
  • 4、关闭事务

注:代码有很大删减

  1. private class SqlSessionInterceptor implements InvocationHandler {
  2. @Override
  3. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  4. SqlSession sqlSession = getSqlSession(
  5. SqlSessionTemplate.this.sqlSessionFactory,
  6. SqlSessionTemplate.this.executorType,
  7. SqlSessionTemplate.this.exceptionTranslator); // 创建或者获取真正需要的 SqlSession
  8. try {
  9. Object result = method.invoke(sqlSession, args); // 执行原本想对 SqlSession 做的事情
  10. if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
  11. // force commit even on non-dirty sessions because some databases require
  12. // a commit/rollback before calling close()
  13. sqlSession.commit(true);// 如非 spring 管理事务,则直接提交
  14. } finally {
  15. if (sqlSession != null) {
  16. closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
  17. }
  18. }
  19. }
  20. }

注意:注释掉的代码在此类型的操作中没有什么意义,getSqlSession() 在这里只是简单通过 sessionFactory 创建了一个 sqlSession:

  1. public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
  2. // SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
  3. // SqlSession session = sessionHolder(executorType, holder);
  4. // if (session != null) {
  5. // return session;
  6. // }
  7. LOGGER.debug(() -> "Creating a new SqlSession");
  8. session = sessionFactory.openSession(executorType);
  9. // registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
  10. return session;
  11. }

3.2.2 sqlSession 借助 TransactionSynchronizationManager 代理流程赏析

看完前面的实现,有小伙伴会好奇,我的 @Transactional 注解呢?我的事务传播等级呢?

实际上,除去上述常规流程,更多的是要借助 
TransactionSynchronizationManager 这个对象来完成,比如刚才步骤一,getSqlSession() 我暂时注释掉的代码里面,有一个很重要的操作:

我们把刚才 getSqlSession() 中注释掉的代码再拿回来看看:

  1. SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
  2. SqlSession session = sessionHolder(executorType, holder);
  3. if (session != null) {
  4. return session;
  5. }
  6. session = sessionFactory.openSession(executorType);
  7. registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
  8. return session;

我们可以看到首先获取一个叫做 SqlSessionHolder 的东西,如果里面没有 sqlSession 则调用 
sessionFactory.openSession(executorType); 创建一个,并把它注册到 
TransactionSynchronizationManager

sqlSessionHolder 没什么可说的,它就只是个纯粹的容器,里面主要就是装着一个 SqlSession :

  1. public SqlSessionHolder(SqlSession sqlSession,
  2. ExecutorType executorType,
  3. PersistenceExceptionTranslator exceptionTranslator) {
  4. notNull(sqlSession, "SqlSession must not be null");
  5. notNull(executorType, "ExecutorType must not be null");
  6. this.sqlSession = sqlSession;
  7. this.executorType = executorType;
  8. this.exceptionTranslator = exceptionTranslator;
  9. }

所以说我们只需要把目光焦距在 
TransactionSynchronizationManager 就可以了,它的内部持有了很多个元素为 Map<Object, Object> 的 ThreadLocal(代码示例中只贴出了 resources 这一个 ThreadLocal ):

  1. public abstract class TransactionSynchronizationManager {
  2. private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
  3. private static final ThreadLocal<Map<Object, Object>> resources =
  4. new NamedThreadLocal<>("Transactional resources");
  5. @Nullable
  6. public static Object getResource(Object key) {
  7. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  8. Object value = doGetResource(actualKey);
  9. if (value != null && logger.isTraceEnabled()) {
  10. logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
  11. Thread.currentThread().getName() + "]");
  12. }
  13. return value;
  14. }
  15. @Nullable
  16. private static Object doGetResource(Object actualKey) {
  17. Map<Object, Object> map = resources.get();
  18. if (map == null) {
  19. return null;
  20. }
  21. Object value = map.get(actualKey);
  22. // Transparently remove ResourceHolder that was marked as void...
  23. if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
  24. map.remove(actualKey);
  25. // Remove entire ThreadLocal if empty...
  26. if (map.isEmpty()) {
  27. resources.remove();
  28. }
  29. value = null;
  30. }
  31. return value;
  32. }

也就是说,spring 的事务,是借助 
TransactionSynchronizationManager + SqlSessionHolder 对 sqlSession 的控制来实现的。

那么这样就很清晰了,如下总结,也如下图:

  • MapperProxy 内置的 sqlSession 是 sqlSessiontemplate
  • sqlSessiontemplate 通过持有 SqlSessionFactory 来创建真正的 SqlSession
  • TransactionSynchronizationManager + SqlSessionHolder 则扮演着 SqlSession 管理的角色

四、spring 如何管理 sqlSession

上一个小节只是讲了是什么,没有讲为什么,到了这里如果有好奇宝宝一定会好奇诸如 spring 的一系列事务控制是怎么实现的,当然本文不会讲太多 spring 事务管理相关的太多东西,以后会有后续文章专门剖析事务管理。

我们可以简单看下 TransactionInterceptor ,这是 @Transactional 注解的代理类。

  1. /**
  2. * AOP Alliance MethodInterceptor for declarative transaction
  3. * management using the common Spring transaction infrastructure
  4. * ({@link org.springframework.transaction.PlatformTransactionManager}/
  5. * {@link org.springframework.transaction.ReactiveTransactionManager}).
  6. *
  7. * <p>Derives from the {@link TransactionAspectSupport} class which
  8. * contains the integration with Spring's underlying transaction API.
  9. * TransactionInterceptor simply calls the relevant superclass methods
  10. * such as {@link #invokeWithinTransaction} in the correct order.
  11. *
  12. * <p>TransactionInterceptors are thread-safe.
  13. *
  14. * @author Rod Johnson
  15. * @author Juergen Hoeller
  16. * @see TransactionProxyFactoryBean
  17. * @see org.springframework.aop.framework.ProxyFactoryBean
  18. * @see org.springframework.aop.framework.ProxyFactory
  19. */
  20. @SuppressWarnings("serial")
  21. public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
  22. /**
  23. * Create a new TransactionInterceptor.
  24. * <p>Transaction manager and transaction attributes still need to be set.
  25. * @see #setTransactionManager
  26. * @see #setTransactionAttributes(java.util.Properties)
  27. * @see #setTransactionAttributeSource(TransactionAttributeSource)
  28. */
  29. public TransactionInterceptor() {
  30. }
  31. @Override
  32. @Nullable
  33. public Object invoke(MethodInvocation invocation) throws Throwable {
  34. // Work out the target class: may be {@code null}.
  35. // The TransactionAttributeSource should be passed the target class
  36. // as well as the method, which may be from an interface.
  37. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  38. // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  39. return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
  40. }

可以看到它的代理方法 invoke() 的执行逻辑在 invokeWithinTransaction() 里:

  1. --代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction --
  2. @Nullable
  3. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  4. final InvocationCallback invocation) throws Throwable {
  5. // If the transaction attribute is null, the method is non-transactional.
  6. TransactionAttributeSource tas = getTransactionAttributeSource();
  7. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  8. final TransactionManager tm = determineTransactionManager(txAttr);
  9. if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
  10. // 响应式事务相关
  11. }
  12. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  13. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  14. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  15. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  16. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  17. Object retVal;
  18. try {
  19. // This is an around advice: Invoke the next interceptor in the chain.
  20. // This will normally result in a target object being invoked.
  21. retVal = invocation.proceedWithInvocation();
  22. }
  23. catch (Throwable ex) {
  24. // target invocation exception
  25. completeTransactionAfterThrowing(txInfo, ex);
  26. throw ex;
  27. }
  28. finally {
  29. cleanupTransactionInfo(txInfo);
  30. }
  31. if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  32. // Set rollback-only in case of Vavr failure matching our rollback rules...
  33. TransactionStatus status = txInfo.getTransactionStatus();
  34. if (status != null && txAttr != null) {
  35. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  36. }
  37. }
  38. commitTransactionAfterReturning(txInfo);
  39. return retVal;
  40. }
  41. else {
  42. // CallbackPreferringPlatformTransactionManager 的处理逻辑
  43. }
  44. }

invokeWithinTransaction() 的代码虽然长,我们还是把它分段来看:

4.1 TransactionDefinition 与 TransactionManager 的创建

  • 第一部分,准备阶段

也就是这部分代码:

  1. // If the transaction attribute is null, the method is non-transactional.
  2. TransactionAttributeSource tas = getTransactionAttributeSource();
  3. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  4. final TransactionManager tm = determineTransactionManager(txAttr);
  5. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  6. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

获取 TransactionAttribute(TransactionDefinition(底层接口),这里面装载了事务传播等级,隔离级别等属性。TransactionAttribute 的创建依据配置,或者我们的事务传播等级注解,对什么异常进行回滚等,后续会继续对它的应用做说明, 
PlatformTransactionManager 则是进行事务管理的主要操作者。

4.2 获取 TransactionInfo

  • 第二部分,事务开启或者获取与准备,也就是我们执行逻辑的第一行代码 createTransactionIfNecessary()(是不是和前面说到的 SqlSession的创建或者获取很像?)

我们可以看到 
createTransactionIfNecessary() 的实现就做了两件事,其一是获取一个叫做 TransactionStatus 的东西,另外则是调用 prepareTransactionInfo(),获取一个 TransactionInfo:

  1. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  2. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  3. --代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary --
  4. protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
  5. @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
  6. TransactionStatus status = tm.getTransaction(txAttr);
  7. return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  8. }

先看看第一件事,也就是获取 TransactionStatus,它保存了事务的 savePoint ,是否新事物等。删减掉一些判断方法,代码如下:

  1. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  2. throws TransactionException {
  3. // Use defaults if no transaction definition given.
  4. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  5. Object transaction = doGetTransaction();
  6. boolean debugEnabled = logger.isDebugEnabled();
  7. if (isExistingTransaction(transaction)) {
  8. // Existing transaction found -> check propagation behavior to find out how to behave.
  9. return handleExistingTransaction(def, transaction, debugEnabled);
  10. }
  11. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  12. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  13. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  14. SuspendedResourcesHolder suspendedResources = suspend(null);
  15. try {
  16. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  17. DefaultTransactionStatus status = newTransactionStatus(
  18. def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  19. doBegin(transaction, def);
  20. prepareSynchronization(status, def);
  21. return status;
  22. }
  23. catch (RuntimeException | Error ex) {
  24. resume(null, suspendedResources);
  25. throw ex;
  26. }
  27. }
  28. else {
  29. // Create "empty" transaction: no actual transaction, but potentially synchronization.
  30. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
  31. logger.warn("Custom isolation level specified but no actual transaction initiated; " +
  32. "isolation level will effectively be ignored: " + def);
  33. }
  34. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  35. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  36. }
  37. }

代码很长,但是不急,我们可以简单看出它分为两个部分:

  • 第一部分是获取事务 doGetTransaction()
  • 第二部分则是判断是否新事物,如果不是新事物,则执行 handleExistingTransaction,如果是新事物则 TransactionDefinition.PROPAGATION_REQUIRED、TransactionDefinition.PROPAGATION_REQUIRES_NEW、TransactionDefinition.PROPAGATION_NESTED 是一种逻辑其余是另一种逻辑,信息量有点大,但是慢慢来:

4.2.1 doGetTransaction

  1. protected Object doGetTransaction() {
  2. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  3. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  4. ConnectionHolder conHolder =
  5. (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  6. txObject.setConnectionHolder(conHolder, false);
  7. return txObject;
  8. }

doGetTransaction 获取我们的事务对象,这里也使用了 
TransactionSynchronizationManager(前面说到的 SqlSession 的管理类),事务对象会尝试获取本事务所使用的连接对象,这个和事务传播等级有关,先立个 flag。

我们可以看到这里面主要逻辑就是去获取 ConnectionHolder,实际上很简单,只要能获取到,就是已经存在的事务,获取不到(或者事务已经关闭)就是新事物。

4.2.2 新事物的处理之创建一个真正的事务对象

如果说前面无法从 
TransactionSynchronizationManager 获取到 conHolder,或者说,我们的线程中并没有 ConnectionHolder那么将会进入此分支,此分支的支持的三个事务传播等级 TransactionDefinition.PROPAGATION_REQUIRED、TransactionDefinition.PROPAGATION_REQUIRES_NEW、TransactionDefinition.PROPAGATION_NESTED 都是需要创建新事务的,所以它们在同一个分支里面:

  1. SuspendedResourcesHolder suspendedResources = suspend(null);
  2. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  3. DefaultTransactionStatus status = newTransactionStatus(
  4. def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  5. doBegin(transaction, def);
  6. prepareSynchronization(status, def);
  7. return status;

SuspendedResourcesHolder 与事务的挂起相关,doBegin() 则是对连接对象 connection 的获取和配置,prepareSynchronization() 则是对新事物的一些初始化操作。我们一点点看:

  1. /**
  2. * This implementation sets the isolation level but ignores the timeout.
  3. */
  4. @Override
  5. protected void doBegin(Object transaction, TransactionDefinition definition) {
  6. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  7. Connection con = null;
  8. if (!txObject.hasConnectionHolder() ||
  9. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  10. Connection newCon = obtainDataSource().getConnection();
  11. if (logger.isDebugEnabled()) {
  12. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  13. }
  14. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  15. }
  16. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  17. con = txObject.getConnectionHolder().getConnection();
  18. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  19. txObject.setPreviousIsolationLevel(previousIsolationLevel);
  20. txObject.setReadOnly(definition.isReadOnly());
  21. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
  22. // so we don't want to do it unnecessarily (for example if we've explicitly
  23. // configured the connection pool to set it already).
  24. if (con.getAutoCommit()) {
  25. txObject.setMustRestoreAutoCommit(true);
  26. if (logger.isDebugEnabled()) {
  27. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
  28. }
  29. con.setAutoCommit(false);
  30. }
  31. prepareTransactionalConnection(con, definition);
  32. txObject.getConnectionHolder().setTransactionActive(true);
  33. // Bind the connection holder to the thread.
  34. if (txObject.isNewConnectionHolder()) {
  35. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  36. }
  37. }
  38. }

可以看到,ConnectionHolder 的创建和连接的打开就是在这里进行的,创建后,设置其隔离级别,取消 connection 的自动提交,将提交操作纳入到 spring 管理,并且将其存到 
TransactionSynchronizationManager 使得 4.2.1 提到的 doGetTransaction() 可以拿到此 ConnectionHolder。


做完连接的获取与配置后,下一步就是对事物的一些初始化:

  1. /**
  2. * Initialize transaction synchronization as appropriate.
  3. */
  4. protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
  5. if (status.isNewSynchronization()) {
  6. TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
  7. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
  8. definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
  9. definition.getIsolationLevel() : null);
  10. TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
  11. TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
  12. TransactionSynchronizationManager.initSynchronization();
  13. }
  14. }

这个代码都是代码字面意义的简单设置,就不赘述了。

4.2.3 新事物的处理之创建一个虚假的事务对象

刚才讲的是 “无法从 
TransactionSynchronizationManager 获取到 conHolder”,并且属于一些需要创建新事物的传播等级的情况。

如果说方才没有事务,也不需要创建新的事务,则会进入此分支,创建一个空的 TransactionStatus,内部的事务对象为空,代码很简单就不贴了,有兴趣可以去看看 
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction 的最后一个分支。

4.2.4 事务嵌套

刚才说的都是无法获取到 conHolder 的情况,如果获取到了,则又是另一套代码了,handleExistingTransaction 很长,它的第一个部分是对传播等级的控制,有兴趣的小伙伴可以去看看源码,我这里只挑一个简单的传播等级 PROPAGATION_NESTED_NEW做说明(其他的会在专门的事务一期做讲解):

  1. -- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction --
  2. private TransactionStatus handleExistingTransaction(
  3. TransactionDefinition definition, Object transaction, boolean debugEnabled)
  4. throws TransactionException {
  5. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  6. if (debugEnabled) {
  7. logger.debug("Suspending current transaction, creating new transaction with name [" +
  8. definition.getName() + "]");
  9. }
  10. SuspendedResourcesHolder suspendedResources = suspend(transaction);
  11. try {
  12. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  13. DefaultTransactionStatus status = newTransactionStatus(
  14. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  15. doBegin(transaction, definition);
  16. prepareSynchronization(status, definition);
  17. return status;
  18. }
  19. catch (RuntimeException | Error beginEx) {
  20. resumeAfterBeginException(transaction, suspendedResources, beginEx);
  21. throw beginEx;
  22. }
  23. }
  24. ... 略
  25. }

我们可以发现和 4.2.2 新事物的处理 代码是一样的,唯一的区别就是此 TransactionStatus 对象会真正内嵌一个事务挂起对象 SuspendedResourcesHolder 。

4.3 封装 TransactionInfo

拿到 TransactionStatus 之后, prepareTransactionInfo() 里简单的将刚才那些 
PlatformTransactionManager 、TransactionAttribute、TransactionStatus 包装成一个 TransactionInfo 对象,并将其保存在 ThreadLocal 中,这个 bindToThread() 还会将当前已经持有的 TransactionInfo 对象暂存。

  1. protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
  2. @Nullable TransactionAttribute txAttr, String joinpointIdentification,
  3. @Nullable TransactionStatus status) {
  4. TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
  5. if (txAttr != null) {
  6. // The transaction manager will flag an error if an incompatible tx already exists.
  7. txInfo.newTransactionStatus(status);
  8. }
  9. // We always bind the TransactionInfo to the thread, even if we didn't create
  10. // a new transaction here. This guarantees that the TransactionInfo stack
  11. // will be managed correctly even if no transaction was created by this aspect.
  12. txInfo.bindToThread();
  13. return txInfo;
  14. }

到这里思路就很清晰了,代理为我们做的事情就是生成了一个叫做 TransactionInfo 的东西,里面的 TransactionManager 可以使得 spring 去对最底层的 connection 对象做一些回滚,提交操作。TransactionStatus 则保存挂起的事务的信息,以及当前事务的一些状态,如下图:

4.4 纵览流程

让我们回到第四节开头的那段很长的代码,到这里是不是很明了了:

  1. protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
  2. final InvocationCallback invocation) throws Throwable {
  3. // If the transaction attribute is null, the method is non-transactional.
  4. TransactionAttributeSource tas = getTransactionAttributeSource();
  5. final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  6. final TransactionManager tm = determineTransactionManager(txAttr);
  7. PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  8. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  9. if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
  10. // Standard transaction demarcation with getTransaction and commit/rollback calls.
  11. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
  12. Object retVal;
  13. try {
  14. // This is an around advice: Invoke the next interceptor in the chain.
  15. // This will normally result in a target object being invoked.
  16. retVal = invocation.proceedWithInvocation();
  17. }
  18. catch (Throwable ex) {
  19. // target invocation exception
  20. completeTransactionAfterThrowing(txInfo, ex);
  21. throw ex;
  22. }
  23. finally {
  24. cleanupTransactionInfo(txInfo);
  25. }
  26. if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
  27. // Set rollback-only in case of Vavr failure matching our rollback rules...
  28. TransactionStatus status = txInfo.getTransactionStatus();
  29. if (status != null && txAttr != null) {
  30. retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
  31. }
  32. }
  33. commitTransactionAfterReturning(txInfo);
  34. return retVal;
  35. }
  36. }
  • 1、获取 TransactionInfo
  • 2、执行切面
  • 3、将之前挂起的 TransactionInfo 找回:
  1. private void bindToThread() {
  2. // Expose current TransactionStatus, preserving any existing TransactionStatus
  3. // for restoration after this transaction is complete.
  4. this.oldTransactionInfo = transactionInfoHolder.get();
  5. transactionInfoHolder.set(this);
  6. }
  7. private void restoreThreadLocalStatus() {
  8. // Use stack to restore old transaction TransactionInfo.
  9. // Will be null if none was set.
  10. transactionInfoHolder.set(this.oldTransactionInfo);
  11. }
  • 4、如果需要,则提交当前事务
  • 5、返回切面值

4.5 最后一块拼图,spring 如何与 sqlSession 产生关联:

我们在第三章讲到,mybatis有一个叫做 defualtSqlSessionFactory 的类,负责创建 sqlSession,但是它和 spring 又是怎么产生关联的呢?

答案就在于,spring 实现了自己的 TransactionFactory,以及自己的 Transaction 对象 SpringManagedTransaction 。回顾一下 SqlSession 的创建过程:

  1. private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
  2. Transaction tx = null;
  3. try {
  4. final Environment environment = configuration.getEnvironment();
  5. final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
  6. tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
  7. final Executor executor = configuration.newExecutor(tx, execType);
  8. return new DefaultSqlSession(configuration, executor, autoCommit);
  9. } catch (Exception e) {
  10. closeTransaction(tx); // may have fetched a connection so lets call close()
  11. throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
  12. } finally {
  13. ErrorContext.instance().reset();
  14. }
  15. }

看一下 SpringManagedTransaction 是如何管理 connection的:

  1. private void openConnection() throws SQLException {
  2. this.connection = DataSourceUtils.getConnection(this.dataSource);
  3. this.autoCommit = this.connection.getAutoCommit();
  4. this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
  5. LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will"
  6. + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
  7. }


DataSourceUtils.getConnection(this.dataSource); 划重点,里面的实现不用我多说了,我们可以看到熟悉的身影,也就是 ConnectionHolder,连接是从这里(优先)拿的:

  1. ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
  2. if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
  3. conHolder.requested();
  4. if (!conHolder.hasConnection()) {
  5. logger.debug("Fetching resumed JDBC Connection from DataSource");
  6. conHolder.setConnection(fetchConnection(dataSource));
  7. }
  8. return conHolder.getConnection();
  9. }

更新整套体系图:

我们整体简单过一次:

  • mybatis 启动时根据xml、注解创建了 mapperedStatement,用于sql执行,创建了 SqlSessionFactory 用于创建 SqlSession 对象。
  • mybatis 启动时创建了 MapperProxyFactory 用于创建接口的代理对象 MapperProxy
  • 在创建 MapperProxy 时,spring 为其注入了一个 sqlSession 用于 sql执行,但是这个 sqlSession 是一个代理对象,叫做 sqlSessionTemplate,它会自动选择我们该使用哪个 sqlSession 去执行
  • 在执行时,spring 切面在执行事务之前,会创建一个叫做 TransactionInfo 的对象,此对象会根据事务传播等级来控制是否创建新连接,是否挂起上一个连接,将信息保存在 TransactionSynchronizationManager
  • 到了真正需要创建或者获取 sqlSession 时,spring 重写的 TransactionFactory 会优先去 TransactionSynchronizationManager 中拿连接对象。

原文:
https://my.oschina.net/anur/blog/3153927

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/707743
推荐阅读
相关标签
  

闽ICP备14008679号