赞
踩
利用自动装配类SeataAutoConfiguration引入AbstractAutoProxyCreator类型
的后置处理器SeataAutoDataSourceProxyCreator。并且由后置处理器初始化引介通知IntroductionAdvisor
类型的DefaultIntroductionAdvisor
以及MethodInterceptor
类型的通知SeataAutoDataSourceProxyAdvice
。
Seata在RM中不是通过注解GlobalTransactional、GlobalLock实现分布式事务,而是通过直接代理数据源DataSource
的方式实现与TC之间的通信进而充当分布式事务中RM角色。
切面类advisor之DefaultIntroductionAdvisor以及通知advice之SeataAutoDataSourceProxyAdvice。
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource
customTargetSource) throws BeansException {
return new Object[]{advisor};
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}
DataSource的所有方法都会被DataSourceProxy
代理类代理完成。
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);//由DataSourceProxy完成对DataSource相关方法的代理执行
} else {
return invocation.proceed();// 当前advisor持有的下一个advice。例如aop相关的各类通知
}
}
}
Seata除了DataSourceProxy
代理原生的DataSource,同时还包括连接Connect的代理类ConnectionProxy
、PreparedStatement的代理类PreparedStatementProxy
。
通过DataSource获取链接Connect,通过Connect再创建PreparedStatement,最终由PreparedStatement执行最终的SQL。
public class SimpleExecutor extends BaseExecutor {
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
}
public abstract class BaseStatementHandler implements StatementHandler {
public Statement prepare(Connection connection, Integer transactionTimeout) throws SQLException {
Statement statement = instantiateStatement(connection);
setStatementTimeout(statement, transactionTimeout);
setFetchSize(statement);
return statement;
}
}
public class PreparedStatementHandler extends BaseStatementHandler { protected Statement instantiateStatement(Connection connection) throws SQLException { String sql = boundSql.getSql(); if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) { String[] keyColumnNames = mappedStatement.getKeyColumns(); if (keyColumnNames == null) { return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS); } else { return connection.prepareStatement(sql, keyColumnNames); } } else if (mappedStatement.getResultSetType() != null) { return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY); } else { return connection.prepareStatement(sql); } } }
public class PreparedStatementHandler extends BaseStatementHandler {
public int update(Statement statement) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.execute();
int rows = ps.getUpdateCount();
Object parameterObject = boundSql.getParameterObject();
KeyGenerator keyGenerator = mappedStatement.getKeyGenerator();
keyGenerator.processAfter(executor, mappedStatement, ps, parameterObject);
return rows;
}
}
在Seata中PreparedStatement的代理类PreparedStatementProxy
是通过ExecuteTemplate
实现SQL执行的。
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement{
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
}
上述execute方法中lambda表达式是分布式事务核心逻辑执行完之后触发目标SQL
的终极执行。
public class ExecuteTemplate { public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args){ ... String dbType = statementProxy.getConnectionProxy().getDbType(); ... switch (sqlRecognizer.getSQLType()) { case INSERT: executor = InsertExecutor.. break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } rs = executor.execute(args);//BaseTransactionalExecutor#execute return rs; } }
public abstract class BaseTransactionalExecutor{ public T execute(Object... args) throws Throwable { if (RootContext.inGlobalTransaction()) { String xid = RootContext.getXID(); //数据库连接绑定全局事务ID statementProxy.getConnectionProxy().bind(xid); }//数据库连接绑定全局锁标识 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args);//AbstractDMLBaseExecutor#doExecute } protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { ... ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; String lockKeys = buildLockKey(lockKeyRecords);// 提前初始化全局锁对应的锁key,形式为:表名:主键1,主键2,..., connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); } }
ConnectionProxy的属性ConnectionContext
:将全局事务ID、全局锁标识以及全部锁的Key跟数据库连接建立绑定关系。
public abstract class AbstractDMLBaseExecutor extends BaseTransactionalExecutor{ @Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) {//事务默认都是自动提交 return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } } protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false);//禁止事务自动提交 return new LockRetryPolicy(connectionProxy).execute(() -> {//参考 LockRetryPolicy 章节 T result = executeAutoCommitFalse(args); connectionProxy.commit();// ConnectionProxy#commit 参考 ConnectionProxy 章节 return result; }); } catch (Exception e) { if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } } // 前后镜像、目标方法、UndoLog入库位于同一个本地事务,任何一方出现错误保证多方均会回滚 protected T executeAutoCommitFalse(Object[] args) throws Exception { ... TableRecords beforeImage = beforeImage();// 前镜像,行数据更改前的状态 T result = statementCallback.execute(statementProxy.getTargetStatement(), args);// 执行目标SQL TableRecords afterImage = afterImage(beforeImage);//后镜像 prepareUndoLog(beforeImage, afterImage);//调用父类 准备当前事务的UndoLog表字段内容 return result; } }
executeAutoCommitFalse:
public class ConnectionProxy extends AbstractConnectionProxy { private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy(); private ConnectionContext context = new ConnectionContext(); public void commit() throws SQLException { LOCK_RETRY_POLICY.execute(() -> {//参考 LockRetryPolicy 章节 doCommit(); return null; }); } private void doCommit() throws SQLException { if (context.inGlobalTransaction()) {//@GlobalTransactional 当前请求域中是否存在全局事务ID processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) {//@GlobalLock 当前请求域中是否存在全局锁标识 processLocalCommitWithGlobalLocks(); } else { targetConnection.commit();//常规提交 } } }
ConnectionContext:其属性通过BaseTransactionalExecutor赋值完毕。
public class ConnectionProxy extends AbstractConnectionProxy {
private void processGlobalTransactionCommit() throws SQLException {
register();// 根据现有参数向TC申请本地事务ID之branchId
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();//提交当前连接涉及的全部本地事务
context.reset();
}
}
recognizeLockKeyConflictException:如果LockKeys
存在的前提下抛出LockConflictException
。
flushUndoLogs:最终插入AbstractDMLBaseExecutor
提前备好的UndoLog日志。
checkLock:如果操作的当前行其主键被TC所持有则当前请求获取锁失败则抛出LockConflictException,不断轮询方式继续尝试获取锁。
public class ConnectionProxy extends AbstractConnectionProxy { private void processLocalCommitWithGlobalLocks() throws SQLException { checkLock(context.buildLockKeys());//全局锁的key为 tablename:主键1,主键2 targetConnection.commit(); context.reset(); } public void checkLock(String lockKeys) throws SQLException { try { String resourceId = getDataSourceProxy().getResourceId(); String xId = context.getXid(); boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,resourceId, xId, lockKeys); if (!lockable) { throw new LockConflictException(); } } catch (TransactionException e) { recognizeLockKeyConflictException(e, lockKeys);//存在抛出LockConflictException异常的场景 } } }
表示尝试获取轻量级锁|乐观锁|自旋锁
的策略。AbstractDMLBaseExecutor以及ConnectionProxy都存在内部类LockRetryPolicy。
AbstractDMLBaseExecutor、ConnectionProxy两种之间锁策略的选择具有互斥性。默认情况下选择AbstractDMLBaseExecutor的锁策略。
AbstractDMLBaseExecutor之锁范围比ConnectionProxy锁的范围更广。
public class ConnectionProxy extends AbstractConnectionProxy { private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy { public <T> T execute(Callable<T> callable) throws Exception { if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {//默认为true return callable.call(); } else { return doRetryOnLockConflict(callable); } } protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception { LockRetryController lockRetryController = new LockRetryController(); while (true) { try { return callable.call(); } catch (LockConflictException lockConflict) { onException(lockConflict); lockRetryController.sleep(lockConflict); } catch (Exception e) { onException(e); throw e; } } } } }
LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT属性:用户可以配置选择策略。
doRetryOnLockConflict:如果存在LockConflictException
异常则通过while循环不断重复执行Callable。这也是seata分布式事务中锁策略。
public abstract class AbstractDMLBaseExecutor{
private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
@Override
public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {//默认为true
return doRetryOnLockConflict(callable);
} else {
return callable.call();
}
}
}
}
全局事务A中包含本地事务localA1、localA2。全局事务B中包含本地事务localB1、localB2。
A、B均未提交的前提下localA1是可以读取到localB1提交后的同行数据。也即在全局事务的角度来看,B未提交的情况下数据可以被其他事务A读取到,这种情况称分布式事务的隔离级别为读未提交。如果事务B二阶段由于localB2发生回滚,则localA1读取的数据就是脏数据了。
读隔离即指localA1不能读取到localB1提交后的数据,即提升分布式事务的隔离级别为读已提交。
Seata通过 SELECT FOR UPDATE 语句的代理
实现分布式事务读隔离。SELECT FOR UPDATE 语句的代理也称之为写锁或者排它锁。真实应用中Select语句必须存在 FOR UPDATE语句后缀Seata才会为其代理,具体如下:
public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> { public T doExecute(Object... args) throws Throwable { Connection conn = statementProxy.getConnection(); DatabaseMetaData dbmd = conn.getMetaData(); T rs; Savepoint sp = null; LockRetryController lockRetryController = new LockRetryController(); boolean originalAutoCommit = conn.getAutoCommit(); ArrayList<List<Object>> paramAppenderList = new ArrayList<>(); String selectPKSQL = buildSelectSQL(paramAppenderList); try { if (originalAutoCommit) { conn.setAutoCommit(false);//禁止事务自动提交 } else if (dbmd.supportsSavepoints()) { sp = conn.setSavepoint(); } else { throw new SQLException("not support savepoint. please check your db version"); } while (true) { try { // 执行目标Select语句,当前Select语句是存在写锁的。意味着跟update、delete DML语句存在锁竞争问题 rs = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList); String lockKeys = buildLockKey(selectPKRows);// 生成分布式事务中的全局锁的key if (StringUtils.isNullOrEmpty(lockKeys)) { break; } //Select语句对应的目标方法存在@GlobalTransaction或者@GlobalLock注解都能实现读隔离效果 if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) { // 从TC尝试获取全局锁,如果获取失败则抛出lce继续尝试获取锁,否则获取成功 statementProxy.getConnectionProxy().checkLock(lockKeys); } else { throw new RuntimeException("Unknown situation!"); } break; } catch (LockConflictException lce) { if (sp != null) { conn.rollback(sp); } else { //必须回滚,因为此时Select语句存在写锁的,不回滚会导致当前本地事务睡眠期间其他事务是竞争不到锁的 // 减少写锁时间,提升应用性能 conn.rollback(); } lockRetryController.sleep(lce);//当前事务睡眠 } } } finally { if (sp != null) { conn.releaseSavepoint(sp); } if (originalAutoCommit) { conn.setAutoCommit(true); } } return rs; } }
如上所述,1.整个方法中涉及的连接Connect必须为同一个Connect。
2.本地事务从执行目标Select语句到获取全局锁的这段段时间内,Select对应的行是一直存在写锁的,这也严重影响了应用的并发性能。所以TCC模式推荐为常用分布式事务模式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。