当前位置:   article > 正文

2.Seata之AT模式资源管理器RM_seata 如何确定rm的

seata 如何确定rm的

利用自动装配类SeataAutoConfiguration引入AbstractAutoProxyCreator类型的后置处理器SeataAutoDataSourceProxyCreator。并且由后置处理器初始化引介通知IntroductionAdvisor类型的DefaultIntroductionAdvisor以及MethodInterceptor类型的通知SeataAutoDataSourceProxyAdvice

Seata在RM中不是通过注解GlobalTransactional、GlobalLock实现分布式事务,而是通过直接代理数据源DataSource的方式实现与TC之间的通信进而充当分布式事务中RM角色。

1、SeataAutoDataSourceProxyCreator

切面类advisor之DefaultIntroductionAdvisor以及通知advice之SeataAutoDataSourceProxyAdvice。

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource 	
    		customTargetSource) throws BeansException {
        return new Object[]{advisor};
    }

    @Override
    protected boolean shouldSkip(Class<?> beanClass, String beanName) {
        return SeataProxy.class.isAssignableFrom(beanClass) ||
            !DataSource.class.isAssignableFrom(beanClass) ||
            Arrays.asList(excludes).contains(beanClass.getName());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2、SeataAutoDataSourceProxyAdvice

DataSource的所有方法都会被DataSourceProxy代理类代理完成。

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
        if (m != null) {
            return m.invoke(dataSourceProxy, args);//由DataSourceProxy完成对DataSource相关方法的代理执行
        } else {
            return invocation.proceed();// 当前advisor持有的下一个advice。例如aop相关的各类通知
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.1、DataSourceProxy衔接Mybatis流程

Seata除了DataSourceProxy代理原生的DataSource,同时还包括连接Connect的代理类ConnectionProxy、PreparedStatement的代理类PreparedStatementProxy

通过DataSource获取链接Connect,通过Connect再创建PreparedStatement,最终由PreparedStatement执行最终的SQL。

public class SimpleExecutor extends BaseExecutor {
	private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
	  Statement stmt;
	  Connection connection = getConnection(statementLog);
	  stmt = handler.prepare(connection, transaction.getTimeout());
	  handler.parameterize(stmt);
	  return stmt;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 正常流程中从DataSource获取数据库连接Connection。Seata是对DataSource通过引介通知做了代理,此时获取连接是从DataSourceProxy获取到ConnectionProxy类型的连接。
  2. 获取连接后进一步完成PreparedStatement的创建。
public abstract class BaseStatementHandler implements StatementHandler {
	public Statement prepare(Connection connection, Integer transactionTimeout) throws SQLException {
		 Statement statement = instantiateStatement(connection);
		 setStatementTimeout(statement, transactionTimeout);
		 setFetchSize(statement);
		 return statement;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 利用ConnectionProxy得到PreparedStatement之PreparedStatementProxy。
public class PreparedStatementHandler extends BaseStatementHandler {
	protected Statement instantiateStatement(Connection connection) throws SQLException {
	  String sql = boundSql.getSql();
	  if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) {
	    String[] keyColumnNames = mappedStatement.getKeyColumns();
	    if (keyColumnNames == null) {
	      return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS);
	    } else {
	      return connection.prepareStatement(sql, keyColumnNames);
	    }
	  } else if (mappedStatement.getResultSetType() != null) {
	    return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), 
	    															ResultSet.CONCUR_READ_ONLY);
	  } else {
	    return connection.prepareStatement(sql);
	  }
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
public class PreparedStatementHandler extends BaseStatementHandler {
	public int update(Statement statement) throws SQLException {
	  PreparedStatement ps = (PreparedStatement) statement;
	  ps.execute();
	  int rows = ps.getUpdateCount();
	  Object parameterObject = boundSql.getParameterObject();
	  KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();
	  keyGenerator.processAfter(executor, mappedStatement, ps, parameterObject);
	  return rows;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 此时通过PreparedStatementProxy执行最终SQL之前的准备工作,同时也是Seata实现分布式事务的核心逻辑。

在Seata中PreparedStatement的代理类PreparedStatementProxy是通过ExecuteTemplate实现SQL执行的。

3、ExecuteTemplate

在这里插入图片描述

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement{

	public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

上述execute方法中lambda表达式是分布式事务核心逻辑执行完之后触发目标SQL的终极执行。

public class ExecuteTemplate {

	public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
	    StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args){
	    ...
	   	String dbType = statementProxy.getConnectionProxy().getDbType();
	   	...
	    switch (sqlRecognizer.getSQLType()) {
	    case INSERT:
	         executor = InsertExecutor..
	         break;
	     case UPDATE:
	         executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
	         break;
	     case DELETE:
	         executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
	         break;
	     case SELECT_FOR_UPDATE:
	         executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
	         break;
	     default:
	         executor = new PlainExecutor<>(statementProxy, statementCallback);
	         break;
		 }
		rs = executor.execute(args);//BaseTransactionalExecutor#execute
		return rs;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3.1.BaseTransactionalExecutor

public abstract class BaseTransactionalExecutor{

	public T execute(Object... args) throws Throwable {
	    if (RootContext.inGlobalTransaction()) {
	        String xid = RootContext.getXID();
	        //数据库连接绑定全局事务ID
	        statementProxy.getConnectionProxy().bind(xid);
	    }//数据库连接绑定全局锁标识
	    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
	    return doExecute(args);//AbstractDMLBaseExecutor#doExecute
	}
	
	protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        ...
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);// 提前初始化全局锁对应的锁key,形式为:表名:主键1,主键2,...,
        connectionProxy.appendLockKey(lockKeys);
        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

ConnectionProxy的属性ConnectionContext:将全局事务ID、全局锁标识以及全部锁的Key跟数据库连接建立绑定关系。

3.1.1.AbstractDMLBaseExecutor

public abstract class AbstractDMLBaseExecutor extends BaseTransactionalExecutor{
	@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {//事务默认都是自动提交
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
	
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.setAutoCommit(false);//禁止事务自动提交
            return new LockRetryPolicy(connectionProxy).execute(() -> {//参考 LockRetryPolicy 章节
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();// ConnectionProxy#commit 参考 ConnectionProxy 章节
                return result;
            });
        } catch (Exception e) {
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        }
    }
	
	  // 前后镜像、目标方法、UndoLog入库位于同一个本地事务,任何一方出现错误保证多方均会回滚
	 protected T executeAutoCommitFalse(Object[] args) throws Exception {
        ...
        TableRecords beforeImage = beforeImage();// 前镜像,行数据更改前的状态
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);// 执行目标SQL
        TableRecords afterImage = afterImage(beforeImage);//后镜像
        prepareUndoLog(beforeImage, afterImage);//调用父类 准备当前事务的UndoLog表字段内容 
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

executeAutoCommitFalse:

  1. 准备前后镜像内容。对于update语句其生成前镜像的过程中存在Select … for update共享锁的…。
  2. 执行目标SQL。【不会自动提交】
  3. 准备当前事务的UndoLog表字段内容。

4、ConnectionProxy

public class ConnectionProxy extends AbstractConnectionProxy {
	private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy();
	private ConnectionContext context = new ConnectionContext();
	
	public void commit() throws SQLException {
	    LOCK_RETRY_POLICY.execute(() -> {//参考 LockRetryPolicy 章节
            doCommit();
            return null;
        });
	}
	
	private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {//@GlobalTransactional 当前请求域中是否存在全局事务ID
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {//@GlobalLock 当前请求域中是否存在全局锁标识
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();//常规提交
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

ConnectionContext:其属性通过BaseTransactionalExecutor赋值完毕。

4.1.@GlobalTransactional

public class ConnectionProxy extends AbstractConnectionProxy {

	private void processGlobalTransactionCommit() throws SQLException {
		register();// 根据现有参数向TC申请本地事务ID之branchId
		UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
	    targetConnection.commit();//提交当前连接涉及的全部本地事务
		context.reset();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

recognizeLockKeyConflictException:如果LockKeys存在的前提下抛出LockConflictException
flushUndoLogs:最终插入AbstractDMLBaseExecutor提前备好的UndoLog日志。

4.2.@GlobalLock

checkLock:如果操作的当前行其主键被TC所持有则当前请求获取锁失败则抛出LockConflictException,不断轮询方式继续尝试获取锁。

public class ConnectionProxy extends AbstractConnectionProxy {

	private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());//全局锁的key为 tablename:主键1,主键2
        targetConnection.commit();
        context.reset();
    }

	public void checkLock(String lockKeys) throws SQLException {
        try {
        	String resourceId = getDataSourceProxy().getResourceId();
        	String xId = context.getXid();
            boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,resourceId, xId, lockKeys);
            if (!lockable) {
                throw new LockConflictException();
            }
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, lockKeys);//存在抛出LockConflictException异常的场景
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

6、LockRetryPolicy

表示尝试获取轻量级锁|乐观锁|自旋锁的策略。AbstractDMLBaseExecutor以及ConnectionProxy都存在内部类LockRetryPolicy。

AbstractDMLBaseExecutor、ConnectionProxy两种之间锁策略的选择具有互斥性。默认情况下选择AbstractDMLBaseExecutor的锁策略。
AbstractDMLBaseExecutor之锁范围比ConnectionProxy锁的范围更广。

public class ConnectionProxy extends AbstractConnectionProxy {
	private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
		 public <T> T execute(Callable<T> callable) throws Exception {
		    if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {//默认为true
		        return callable.call();
		    } else {
		        return doRetryOnLockConflict(callable);
		    }
		}
		
		protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
		    LockRetryController lockRetryController = new LockRetryController();
		    while (true) {
		        try {
		            return callable.call();
		        } catch (LockConflictException lockConflict) {
		            onException(lockConflict);
		            lockRetryController.sleep(lockConflict);
		        } catch (Exception e) {
		            onException(e);
		            throw e;
		        }
		    }
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT属性:用户可以配置选择策略。
doRetryOnLockConflict:如果存在LockConflictException异常则通过while循环不断重复执行Callable。这也是seata分布式事务中锁策略。

public abstract class AbstractDMLBaseExecutor{

	private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
	    @Override
	    public <T> T execute(Callable<T> callable) throws Exception {
	        if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {//默认为true
	            return doRetryOnLockConflict(callable);
	        } else {
	            return callable.call();
	        }
	    }
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

7、读写隔离

全局事务A中包含本地事务localA1、localA2。全局事务B中包含本地事务localB1、localB2。

7.1、读隔离

A、B均未提交的前提下localA1是可以读取到localB1提交后的同行数据。也即在全局事务的角度来看,B未提交的情况下数据可以被其他事务A读取到,这种情况称分布式事务的隔离级别为读未提交。如果事务B二阶段由于localB2发生回滚,则localA1读取的数据就是脏数据了。

读隔离即指localA1不能读取到localB1提交后的数据,即提升分布式事务的隔离级别为读已提交。

Seata通过 SELECT FOR UPDATE 语句的代理 实现分布式事务读隔离。SELECT FOR UPDATE 语句的代理也称之为写锁或者排它锁。真实应用中Select语句必须存在 FOR UPDATE语句后缀Seata才会为其代理,具体如下:

public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    
    public T doExecute(Object... args) throws Throwable {
        Connection conn = statementProxy.getConnection();
        DatabaseMetaData dbmd = conn.getMetaData();
        T rs;
        Savepoint sp = null;
        LockRetryController lockRetryController = new LockRetryController();
        boolean originalAutoCommit = conn.getAutoCommit();
        ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
        String selectPKSQL = buildSelectSQL(paramAppenderList);
        try {
            if (originalAutoCommit) {
                conn.setAutoCommit(false);//禁止事务自动提交
            } else if (dbmd.supportsSavepoints()) {
                sp = conn.setSavepoint();
            } else {
                throw new SQLException("not support savepoint. please check your db version");
            }

            while (true) {
                try {
                	// 执行目标Select语句,当前Select语句是存在写锁的。意味着跟update、delete DML语句存在锁竞争问题
                    rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
                    TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                    String lockKeys = buildLockKey(selectPKRows);// 生成分布式事务中的全局锁的key
                    if (StringUtils.isNullOrEmpty(lockKeys)) {
                        break;
                    }
                    //Select语句对应的目标方法存在@GlobalTransaction或者@GlobalLock注解都能实现读隔离效果
                    if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                    	// 从TC尝试获取全局锁,如果获取失败则抛出lce继续尝试获取锁,否则获取成功
                        statementProxy.getConnectionProxy().checkLock(lockKeys);
                    } else {
                        throw new RuntimeException("Unknown situation!");
                    }
                    break;
                } catch (LockConflictException lce) {
                    if (sp != null) {
                        conn.rollback(sp);
                    } else {
                    	//必须回滚,因为此时Select语句存在写锁的,不回滚会导致当前本地事务睡眠期间其他事务是竞争不到锁的
                    	// 减少写锁时间,提升应用性能
                        conn.rollback();
                    }
                    lockRetryController.sleep(lce);//当前事务睡眠
                }
            }
        } finally {
            if (sp != null) {
                conn.releaseSavepoint(sp);
            }
            if (originalAutoCommit) {
                conn.setAutoCommit(true);
            }
        }
        return rs;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

如上所述,1.整个方法中涉及的连接Connect必须为同一个Connect。
2.本地事务从执行目标Select语句到获取全局锁的这段段时间内,Select对应的行是一直存在写锁的,这也严重影响了应用的并发性能。所以TCC模式推荐为常用分布式事务模式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/286135
推荐阅读
相关标签
  

闽ICP备14008679号