当前位置:   article > 正文

seata数据源代理生成之源码分析(四)_before image size is not equaled to after image si

before image size is not equaled to after image size, probably because you u

前几篇分享了seata客户端整个流程的源码,TM,RM相关,今天开始我们从细节入手,全面无死角分析seata源码,首先承接上一篇的内容,分析数据源代理是怎么生成的,并且通过业务演示此数据源。

一、启动时生成代理数据源:

1、SPI机制自动代理配置类入口:在seata-spring-boot-starter模块的SeataDataSourceAutoConfiguration 配置类中,开启了seata数据源的自动代理。
 

点击  SeataDataSourceAutoConfiguration 类进入

  1. @ConditionalOnBean(DataSource.class)//处理 DataSource 数据源的类
  2. //此处配置对应配置文件中的参数,比如nacos中的配置
  3. @ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
  4. public class SeataDataSourceAutoConfiguration {
  5. /**
  6. * The bean seataDataSourceBeanPostProcessor.
  7. * 生成数据源的代理对象
  8. */
  9. @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
  10. @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
  11. public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
  12. //点击进入
  13. return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
  14. }
  15. /**
  16. * The bean seataAutoDataSourceProxyCreator.
  17. * 在上面的类中,生成了数据源的代理对象,那么执行数据增删改查时,是如何切换到代理数据源的呢?
  18. * SeataAutoDataSourceProxyCreator继承了AbstractAutoProxyCreator抽象类,Spring
  19. * 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,用于在 bean 初始化完成之后创建它的代理。
  20. * 在Seata 中,该类目的是为数据源添加Advisor,当数据源执行操作时,会进入其 SeataAutoDataSourceProxyAdvice 类中处理,比如进入(GlobalTransactionalInterceptor的invoke方法)
  21. */
  22. @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
  23. @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
  24. public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
  25. //点击进入
  26. return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
  27. seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
  28. }
  29. }

注解里的配置对应配置文件里的:比如nacos中,截取部分配置如下

如果在java类中注解 就可以看看这个  SeataProperties 类,一目了然。

 2、点击 SeataDataSourceBeanPostProcessor  进入

  1. public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);
  3. private final List<String> excludes;
  4. private final BranchType dataSourceProxyMode;
  5. //构造函数
  6. public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
  7. this.excludes = Arrays.asList(excludes);
  8. this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
  9. }
  10. @Override
  11. public Object postProcessBeforeInitialization(Object bean, String beanName) {
  12. return bean;
  13. }
  14. @Override//此类继承 BeanPostProcessor,此处是后置处理器方法,对象初始化后执行
  15. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  16. if (bean instanceof DataSource) {
  17. //When not in the excludes, put and init proxy.
  18. // 配置中没有忽略DataSource类的代理,则进行代理
  19. if (!excludes.contains(bean.getClass().getName())) {
  20. //Only put and init proxy, not return proxy. 初始化代理,进入 DataSourceProxyHolder
  21. DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
  22. }
  23. //If is SeataDataSourceProxy, return the original data source.
  24. // 如果Bean 已经是SeataDataSourceProxy,返回原来的数据源
  25. if (bean instanceof SeataDataSourceProxy) {
  26. LOGGER.info("Unwrap the bean of the data source," +
  27. " and return the original data source to replace the data source proxy.");
  28. return ((SeataDataSourceProxy) bean).getTargetDataSource();
  29. }
  30. }
  31. return bean;
  32. }
  33. }

 3、点击  DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);先 初始化DataSourceProxyHolder对象

初始化对象:

  1. // 初始化 DataSourceProxyHolder 对象
  2. public static DataSourceProxyHolder get() {
  3. return Holder.INSTANCE;
  4. }

生成创建代理:

  1. public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
  2. DataSource originalDataSource;
  3. // 1. 如果数据源是SeataDataSourceProxy,则直接返回
  4. if (dataSource instanceof SeataDataSourceProxy) {
  5. SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
  6. //If it's an right proxy, return it directly.
  7. // 如果是正确的代理,请直接返回。
  8. if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
  9. return (SeataDataSourceProxy) dataSource;
  10. }
  11. //Get the original data source. 获取原始数据源。
  12. originalDataSource = dataSourceProxy.getTargetDataSource();
  13. } else {
  14. originalDataSource = dataSource;
  15. }
  16. // 2. 从存放代理的集合中获取该数据源的代理数据源
  17. SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
  18. if (dsProxy == null) {
  19. // 3.如果没有则创建代理并放入集合中
  20. synchronized (dataSourceProxyMap) {
  21. dsProxy = dataSourceProxyMap.get(originalDataSource);
  22. if (dsProxy == null) {//获取代理数据源,点击进入
  23. dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
  24. dataSourceProxyMap.put(originalDataSource, dsProxy);
  25. }
  26. }
  27. }
  28. return dsProxy;//4. 返回
  29. }

点击  createDsProxyByMode 方法

  1. private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
  2. //如果是XA 模式,创建DataSourceProxyXA,其他模式(AT模式)创建DataSourceProxy
  3. //到此正式获取代理数据源,此后就可以执行RM数据库资源操作了
  4. return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
  5. }

4、然后回到最上游看  SeataAutoDataSourceProxyCreator 类,点击进入:

  1. public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
  3. private final List<String> excludes;
  4. private final Advisor advisor;
  5. public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
  6. this.excludes = Arrays.asList(excludes);
  7. /**
  8. * 当数据源执行操作时,由于添加了AOP代理,最终会进入到 SeataAutoDataSourceProxyAdvice 的invoke方法中,点击进入
  9. */
  10. this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
  11. setProxyTargetClass(!useJdkProxy);
  12. }
  13. @Override// 为数据源Bean 添加 Advisor
  14. protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
  15. if (LOGGER.isInfoEnabled()) {
  16. LOGGER.info("Auto proxy of [{}]", beanName);
  17. }
  18. return new Object[]{advisor};
  19. }
  20. @Override// 不是DataSource 则跳过
  21. protected boolean shouldSkip(Class<?> beanClass, String beanName) {
  22. return !DataSource.class.isAssignableFrom(beanClass) ||
  23. SeataProxy.class.isAssignableFrom(beanClass) ||
  24. excludes.contains(beanClass.getName());
  25. }
  26. }

5、点击SeataAutoDataSourceProxyAdvice 

  1. public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
  2. private final BranchType dataSourceProxyMode;
  3. private final Class<? extends SeataDataSourceProxy> dataSourceProxyClazz;
  4. public SeataAutoDataSourceProxyAdvice(String dataSourceProxyMode) {
  5. if (BranchType.AT.name().equalsIgnoreCase(dataSourceProxyMode)) {
  6. this.dataSourceProxyMode = BranchType.AT;
  7. this.dataSourceProxyClazz = DataSourceProxy.class;
  8. } else if (BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode)) {
  9. this.dataSourceProxyMode = BranchType.XA;
  10. this.dataSourceProxyClazz = DataSourceProxyXA.class;
  11. } else {
  12. throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + dataSourceProxyMode);
  13. }
  14. //Set the default branch type in the RootContext.
  15. RootContext.setDefaultBranchType(this.dataSourceProxyMode);
  16. }
  17. //代理调用进入的方法
  18. @Override
  19. public Object invoke(MethodInvocation invocation) throws Throwable {
  20. if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
  21. return invocation.proceed();
  22. }
  23. // 数据源执行的方法,比如获取连接的 getConnection()
  24. Method method = invocation.getMethod();
  25. Object[] args = invocation.getArguments();
  26. // 查询代理数据源对应的方法 DataSourceProxy.getConnection()
  27. Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
  28. if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {//获取代理数据源
  29. SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
  30. // 执行代理数据源的方法
  31. return m.invoke(dataSourceProxy, args);
  32. } else {
  33. return invocation.proceed();
  34. }
  35. }
  36. @Override
  37. public Class<?>[] getInterfaces() {
  38. return new Class[]{SeataProxy.class};
  39. }
  40. }

6、谁调用 getAdvicesAndAdvisorsForBean 方法,Ctrl + g 看到很熟悉的方法

 来到

 二、业务代码演示数据源执行流程:

首先需要明白,数据源或者代理数据源肯定是要放到JDBC里面的,单独的JDBC或者封装后的ORM框架(mybatis...),此处为了方便演示我们采用原生的JDBC,这样更容易看源码;

1、业务里配置代理数据源

  1. /**
  2. * seata的数据源代理
  3. */
  4. @Data
  5. @Configuration
  6. @ConfigurationProperties(prefix = "druid-master",ignoreInvalidFields = true)
  7. public class DataSourceConfiguration {
  8. //配置的数据源参数
  9. private String driverClassName;
  10. private String username;
  11. private String jdbcUrl;
  12. private String password;
  13. private int maxActive;
  14. private int minIdle;
  15. private int initialSize;
  16. private Long timeBetweenEvictionRunsMillis;
  17. private Long minEvictableIdleTimeMillis;
  18. private String validationQuery;
  19. private boolean testWhileIdle;
  20. private boolean testOnBorrow;
  21. private boolean testOnReturn;
  22. private boolean poolPreparedStatements;
  23. private Integer maxPoolPreparedStatementPerConnectionSize;
  24. private String filters;
  25. private String connectionProperties;
  26. //原始数据源配置
  27. @Bean(name = "masterDataSource",destroyMethod = "close",initMethod = "init")
  28. public DataSource getMasterDs(){
  29. DruidDataSource druidDataSource = new DruidDataSource();
  30. druidDataSource.setDriverClassName(driverClassName);
  31. druidDataSource.setUrl(jdbcUrl);
  32. druidDataSource.setUsername(username);
  33. druidDataSource.setPassword(password);
  34. druidDataSource.setMaxActive(maxActive);
  35. druidDataSource.setInitialSize(initialSize);
  36. druidDataSource.setTimeBetweenConnectErrorMillis(timeBetweenEvictionRunsMillis);
  37. druidDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
  38. druidDataSource.setValidationQuery(validationQuery);
  39. druidDataSource.setTestWhileIdle(testWhileIdle);
  40. druidDataSource.setTestOnBorrow(testOnBorrow);
  41. druidDataSource.setTestOnReturn(testOnReturn);
  42. druidDataSource.setPoolPreparedStatements(poolPreparedStatements);
  43. druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);
  44. try {
  45. druidDataSource.setFilters(filters);
  46. } catch (SQLException e) {
  47. e.printStackTrace();
  48. }
  49. return druidDataSource;
  50. }
  51. @Bean
  52. JdbcTemplate jdbcTemplate(@Qualifier("masterDataSource") DataSource dataSource) {
  53. //代理数据源配置
  54. JdbcTemplate jdbcTemplate = new JdbcTemplate(new DataSourceProxy(dataSource));
  55. return jdbcTemplate;
  56. }
  57. }

yml部分配置测试:

  1. druid-master:
  2. jdbcUrl: jdbc:mysql://16.2.12.18:3309/paycallback?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false
  3. username: root
  4. password: 123456
  5. driver-class-name: com.mysql.jdbc.Driver
  6. minIdle: 2
  7. maxActive: 10
  8. maxWait: 60000
  9. timeBetweenEvictionRunsMillis: 60000
  10. minEvictableIdleTimeMillis: 300000
  11. validationQuery: SELECT 1 FROM DUAL
  12. testWhileIdle: true
  13. testOnBorrow: false
  14. testOnReturn: false
  15. poolPreparedStatements: true
  16. maxPoolPreparedStatementPerConnectionSize: 20
  17. filters: stat,wall
  18. connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

2、业务代码引用

  1. @Autowired
  2. private JdbcTemplate jdbcTemplate;//依赖注入

伪代码段:

  1. OrderAction orderAction = orderActionMapper.selectByPrimaryKey(Long.parseLong(actionId));
  2. Order order = orderMapper.selectByPrimaryKey(orderAction.getOrderId());
  3. String updateOrderSql = "update tp_order_kill set pay_status = ?,pay_time = ? where order_id = ?";
  4. jdbcTemplate.update(updateOrderSql, new PreparedStatementSetter() {
  5. @Override
  6. public void setValues(PreparedStatement ps) throws SQLException {
  7. ps.setInt(1, PayStatus.PAID.getCode());
  8. ps.setLong(2, System.currentTimeMillis());
  9. ps.setLong(3, order.getOrderId());
  10. }
  11. });

 3、点击 jdbcTemplate.update( 方法:进入jdbcTemplate

  1. @Override
  2. public int update(String sql, Object... args) throws DataAccessException {
  3. return update(sql, newArgPreparedStatementSetter(args));//点击进入
  4. }

来到:

  1. @Override
  2. public int update(String sql, PreparedStatementSetter pss) throws DataAccessException {
  3. return update(new SimplePreparedStatementCreator(sql), pss);//点击
  4. }

 来到:

  1. protected int update(final PreparedStatementCreator psc, final PreparedStatementSetter pss)
  2. throws DataAccessException {
  3. logger.debug("Executing prepared SQL update");
  4. //点击 进入核心代码
  5. return execute(psc, new PreparedStatementCallback<Integer>() {
  6. @Override
  7. public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
  8. try {
  9. if (pss != null) {
  10. pss.setValues(ps);
  11. }
  12. int rows = ps.executeUpdate();//核心代码
  13. if (logger.isDebugEnabled()) {
  14. logger.debug("SQL update affected " + rows + " rows");
  15. }
  16. return rows;
  17. }
  18. finally {
  19. if (pss instanceof ParameterDisposer) {
  20. ((ParameterDisposer) pss).cleanupParameters();
  21. }
  22. }
  23. }
  24. });
  25. }

4、点击return execute(psc, new PreparedStatementCallback<Integer>() 接口

  1. @Override
  2. public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
  3. throws DataAccessException {
  4. Assert.notNull(psc, "PreparedStatementCreator must not be null");
  5. Assert.notNull(action, "Callback object must not be null");
  6. if (logger.isDebugEnabled()) {
  7. String sql = getSql(psc);
  8. logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
  9. }
  10. Connection con = DataSourceUtils.getConnection(getDataSource());//核心 getDataSource() 获取的是代理数据源
  11. PreparedStatement ps = null;
  12. try {
  13. Connection conToUse = con;
  14. if (this.nativeJdbcExtractor != null &&
  15. this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
  16. conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
  17. }
  18. ps = psc.createPreparedStatement(conToUse);//重点 获取是代理类
  19. applyStatementSettings(ps);
  20. PreparedStatement psToUse = ps;
  21. if (this.nativeJdbcExtractor != null) {
  22. psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
  23. }
  24. T result = action.doInPreparedStatement(psToUse);//执行业务sql
  25. handleWarnings(ps);
  26. return result;
  27. }
  28. catch (SQLException ex) {
  29. // Release Connection early, to avoid potential connection pool deadlock
  30. // in the case when the exception translator hasn't been initialized yet.
  31. if (psc instanceof ParameterDisposer) {
  32. ((ParameterDisposer) psc).cleanupParameters();
  33. }
  34. String sql = getSql(psc);
  35. psc = null;
  36. JdbcUtils.closeStatement(ps);
  37. ps = null;
  38. DataSourceUtils.releaseConnection(con, getDataSource());
  39. con = null;
  40. throw getExceptionTranslator().translate("PreparedStatementCallback", sql, ex);
  41. }
  42. finally {
  43. if (psc instanceof ParameterDisposer) {
  44. ((ParameterDisposer) psc).cleanupParameters();
  45. }
  46. JdbcUtils.closeStatement(ps);
  47. DataSourceUtils.releaseConnection(con, getDataSource());
  48. }
  49. }

5、点击  getDataSource() 获得代理数据源

  1. public DataSource getDataSource() {
  2. return this.dataSource;
  3. }

点击  psc.createPreparedStatement(conToUse); 进入 PreparedStatementCreatorFactory 工厂

  1. @Override
  2. public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
  3. PreparedStatement ps;
  4. if (generatedKeysColumnNames != null || returnGeneratedKeys) {
  5. if (generatedKeysColumnNames != null) {
  6. ps = con.prepareStatement(this.actualSql, generatedKeysColumnNames);
  7. }
  8. else {
  9. ps = con.prepareStatement(this.actualSql, PreparedStatement.RETURN_GENERATED_KEYS);
  10. }
  11. }
  12. else if (resultSetType == ResultSet.TYPE_FORWARD_ONLY && !updatableResults) {
  13. ps = con.prepareStatement(this.actualSql);
  14. }
  15. else {
  16. ps = con.prepareStatement(this.actualSql, resultSetType,
  17. updatableResults ? ResultSet.CONCUR_UPDATABLE : ResultSet.CONCUR_READ_ONLY);
  18. }
  19. setValues(ps);
  20. return ps;
  21. }

6、点击  con.prepareStatement 进入 AbstractConnectionProxy 类

 7、来到 AbstractConnectionProxy

  1. @Override
  2. public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
  3. PreparedStatement preparedStatement = targetConnection.prepareStatement(sql, autoGeneratedKeys);
  4. return new PreparedStatementProxy(this, preparedStatement, sql);//点击进入
  5. }

8、点击  PreparedStatementProxy 进入 代理类:重点

  1. /**
  2. * The type Prepared statement proxy.
  3. *
  4. * @author sharajava
  5. *
  6. * 全局事务流程接着进入到标注有@GlobalTransactional注解的业务方法中,当执行到SQL语句时,由于Seata 对数据源进行了代理,
  7. * 所以所以的SQL 执行都会进入到其代理方法中。
  8. * 在JDBC 操作数据库时,执行SQL 语句的是PreparedStatement,在Seata 中其代理类为PreparedStatementProxy,
  9. * 其execute 方法会调用ExecuteTemplate的execute方法。
  10. */
  11. public class PreparedStatementProxy extends AbstractPreparedStatementProxy
  12. implements PreparedStatement, ParametersHolder {
  13. @Override
  14. public Map<Integer,ArrayList<Object>> getParameters() {
  15. return parameters;
  16. }
  17. /**
  18. * Instantiates a new Prepared statement proxy.
  19. *
  20. * @param connectionProxy the connection proxy
  21. * @param targetStatement the target statement
  22. * @param targetSQL the target sql
  23. * @throws SQLException the sql exception
  24. */
  25. public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement,
  26. String targetSQL) throws SQLException {
  27. super(connectionProxy, targetStatement, targetSQL);
  28. }
  29. @Override
  30. public boolean execute() throws SQLException {//重点接口
  31. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
  32. }
  33. @Override
  34. public ResultSet executeQuery() throws SQLException {
  35. return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
  36. }
  37. @Override
  38. public int executeUpdate() throws SQLException {
  39. return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
  40. }
  41. }

9、此类里面有很多业务接口,比如:

  1. @Override
  2. public boolean execute() throws SQLException {//重点接口
  3. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
  4. }

何时调用到此类接口呢?

10、返回上游看看  action.doInPreparedStatement(psToUse);进入 AbstractLobCreatingPreparedStatementCallback

  1. @Override
  2. public final Integer doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
  3. LobCreator lobCreator = this.lobHandler.getLobCreator();
  4. try {
  5. setValues(ps, lobCreator);
  6. return ps.executeUpdate();//点击进入
  7. }
  8. finally {
  9. lobCreator.close();
  10. }
  11. }

 

11、点击 ps.executeUpdate() 进入

肯定是进入这里,因为上游已经生成了这个对象  PreparedStatementProxy

  1. @Override
  2. public int executeUpdate() throws SQLException {
  3. return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());//点击进入
  4. }

 12、点击  ExecuteTemplate.execute接口

  1. public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
  2. StatementCallback<T, S> statementCallback,
  3. Object... args) throws SQLException {
  4. return execute(null, statementProxy, statementCallback, args);//进入核心方法
  5. }

继续进入

  1. /**
  2. * Execute t.
  3. *
  4. * @param <T> the type parameter
  5. * @param <S> the type parameter
  6. * @param sqlRecognizers the sql recognizer list
  7. * @param statementProxy the statement proxy
  8. * @param statementCallback the statement callback
  9. * @param args the args
  10. * @return the t
  11. * @throws SQLException the sql exception
  12. * ExecuteTemplate中,会根据不同的SQL 操作类型创建不同的执行器:
  13. */
  14. public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
  15. StatementProxy<S> statementProxy,
  16. StatementCallback<T, S> statementCallback,
  17. Object... args) throws SQLException {
  18. // 如果没有全局锁,并且不是AT模式,直接执行SQL
  19. if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
  20. // Just work as original statement
  21. return statementCallback.execute(statementProxy.getTargetStatement(), args);
  22. }
  23. // 得到数据库类型 ->MySQL
  24. String dbType = statementProxy.getConnectionProxy().getDbType();
  25. // 3. 获取执行的SQL ,将SQL 解析为表达式(SQL 解析器) ,sqlRecognizers参数初始为NULL,所以这里都需要获取,
  26. // 普通SELECT 查询这里为null, 增删改都会解析出对应的SQL表达式
  27. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  28. //sqlRecognizers为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型的等信息,最后解析出对应的SQL表达式
  29. sqlRecognizers = SQLVisitorFactory.get(
  30. statementProxy.getTargetSQL(),
  31. dbType);
  32. }
  33. Executor<T> executor;
  34. if (CollectionUtils.isEmpty(sqlRecognizers)) {
  35. //如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor,
  36. //PlainExecutor直接使用原生的Statement对象执行SQL
  37. executor = new PlainExecutor<>(statementProxy, statementCallback);
  38. } else {
  39. if (sqlRecognizers.size() == 1) {
  40. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  41. switch (sqlRecognizer.getSQLType()) {
  42. //下面根据是增、删、改、加锁查询、普通查询分别创建对应的处理器
  43. case INSERT:
  44. executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
  45. new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
  46. new Object[]{statementProxy, statementCallback, sqlRecognizer});
  47. break;
  48. case UPDATE:
  49. executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  50. break;
  51. case DELETE:
  52. executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  53. break;
  54. case SELECT_FOR_UPDATE:// 排它锁语句
  55. executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  56. break;
  57. default:
  58. executor = new PlainExecutor<>(statementProxy, statementCallback);
  59. break;
  60. }
  61. } else {
  62. // 此执行器可以处理一条SQL语句包含多个Delete、Update语句
  63. executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
  64. }
  65. }
  66. T rs;
  67. try {// 执行器执行,钩子方法,这里调用的子类BaseTransactionalExecutor的方法, 更新时使用的是UpdateExecutor
  68. rs = executor.execute(args);//点击进入
  69. } catch (Throwable ex) {
  70. if (!(ex instanceof SQLException)) {
  71. // Turn other exception into SQLException
  72. ex = new SQLException(ex);
  73. }
  74. throw (SQLException) ex;
  75. }
  76. return rs;
  77. }

 

13、点击  executor.execute(args) 进入 BaseTransactionalExecutor 类

  1. @Override
  2. public T execute(Object... args) throws Throwable {
  3. String xid = RootContext.getXID();// 获取xid
  4. if (xid != null) {
  5. statementProxy.getConnectionProxy().bind(xid);
  6. }
  7. // 设置全局锁
  8. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  9. return doExecute(args);//进入子类 AbstractDMLBaseExecutor 重写的方法
  10. }

14、点击 doExecute(args) 进入 AbstractDMLBaseExecutor 

 来到

  1. @Override
  2. public T doExecute(Object... args) throws Throwable {
  3. AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//取出代理数据源
  4. //数据库本身是自动提交,取出代理数据源后,如果拿到执行操作数据库的指令,则进行数据库的操作,进入
  5. if (connectionProxy.getAutoCommit()) {
  6. return executeAutoCommitTrue(args);
  7. } else {//如果没有拿到,先不提交,进入
  8. return executeAutoCommitFalse(args);
  9. }
  10. }

15、点击 executeAutoCommitTrue(args) 

  1. protected T executeAutoCommitTrue(Object[] args) throws Throwable {
  2. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  3. try {// 更改为手动提交,关闭自动提交,保证业务sql和undo_log里的sql执行在同一个事务里面
  4. connectionProxy.changeAutoCommit();
  5. //重试策略提交本地事务,cas思想
  6. // 3. 使用LockRetryPolicy.execute 开启一条线程去执行,LockRetryPolicy 是一个策略,
  7. // 策略对应配置retry-policy-branch-rollback-on-conflict
  8. // 分支事务与其它全局回滚事务冲突时锁策略,默认true,优先释放本地锁让回滚成功
  9. return new LockRetryPolicy(connectionProxy).execute(() -> {
  10. // 调用手动提交方法 得到分支业务最终结果
  11. //解析 sql,查询 beforeImage 执行前的结果集(这里有一个 for update 加一个本地锁),执行业务 sql,查询 afterImage 执行后的结果集
  12. T result = executeAutoCommitFalse(args);
  13. //获取全局锁、插入 undo_log 日志、业务 sql 和 undo_log 的事务提交,核心代码,进入
  14. connectionProxy.commit(); // 执行提交
  15. return result;
  16. });
  17. } catch (Exception e) {
  18. // when exception occur in finally,this exception will lost, so just print it here
  19. LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
  20. if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
  21. connectionProxy.getTargetConnection().rollback();
  22. }
  23. throw e;
  24. } finally {
  25. // 清理资源,设置提供提交为true
  26. connectionProxy.getContext().reset();
  27. connectionProxy.setAutoCommit(true);
  28. }
  29. }

16、点击 connectionProxy.commit();进入 ConnectionProxy

  1. @Override
  2. public void commit() throws SQLException {
  3. try {
  4. LOCK_RETRY_POLICY.execute(() -> {
  5. doCommit();//核心代码
  6. return null;
  7. });
  8. } catch (SQLException e) {
  9. if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  10. rollback();
  11. }
  12. throw e;
  13. } catch (Exception e) {
  14. throw new SQLException(e);
  15. }
  16. }

点击  doCommit() 

  1. private void doCommit() throws SQLException {
  2. if (context.inGlobalTransaction()) {//判断是否存在全局事务
  3. processGlobalTransactionCommit();//全局事务业务提交,进入
  4. } else if (context.isGlobalLockRequire()) { // 如果是GlobalLock
  5. processLocalCommitWithGlobalLocks();
  6. } else {
  7. targetConnection.commit();//原数据源直接提交
  8. }
  9. }

17、点击 processGlobalTransactionCommit()方法

  1. private void processGlobalTransactionCommit() throws SQLException {
  2. try {
  3. /**
  4. * 跟 TC 通讯,申请一把全局锁,算是分布式锁(只不过是用关系型数据库做的而已),其实就是往 lock_table 中插入一条记录,插入成功则加锁成功
  5. * TC端(server端) 核心代码在这server包里 public class LockStoreDataBaseDAO implements LockStore,
  6. * 有兴趣可以往下找,均是公用的远程调用接口,后续会深入分析
  7. */
  8. register(); // 和TC端通信 注册分支,重点
  9. } catch (TransactionException e) {
  10. recognizeLockKeyConflictException(e, context.buildLockKeys());
  11. }
  12. try {//写入数据库undolog
  13. //如果有 undo_log 记录则插入 undo_log 表,会根据 undo_log 来做二阶段回滚,进入
  14. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
  15. //本地事务彻底提交,注意这里 undo_log 和业务 sql 是同一个事务的。此处成功代表此RM执行成功,并释放本地锁。
  16. targetConnection.commit();//执行最原生业务sql提交
  17. } catch (Throwable ex) {
  18. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
  19. report(false);//此处成功后和TC 通讯,报告分支事务的执行失败的结果
  20. throw new SQLException(ex);
  21. }
  22. if (IS_REPORT_SUCCESS_ENABLE) {
  23. report(true);//此处成功后和TC 通讯,报告分支事务的执行成功的结果
  24. }
  25. context.reset();
  26. }

点击  register()

 

  1. // 注册分支事务,生成分支事务id
  2. private void register() throws TransactionException {
  3. if (!context.hasUndoLog() || !context.hasLockKey()) {
  4. return;
  5. }// 注册分支事务
  6. //在事务提交以前,会注册本地事务,最后调用的是 AbstractResourceManager的branchRegister 方法,向TC 发送请求
  7. Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
  8. null, context.getXid(), null, context.buildLockKeys());
  9. context.setBranchId(branchId);//分支事务id添加到上下文中
  10. }

18、点击 branchRegister 方法进入 DefaultResourceManager

 

  1. @Override
  2. public Long branchRegister(BranchType branchType, String resourceId,
  3. String clientId, String xid, String applicationData, String lockKeys)
  4. throws TransactionException {
  5. //AbstractResourceManager 进入
  6. return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,
  7. lockKeys);
  8. }

19、点击 branchRegister 方法 进入 AbstractResourceManager 

  1. @Override
  2. public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  3. try {
  4. BranchRegisterRequest request = new BranchRegisterRequest();
  5. request.setXid(xid);
  6. request.setLockKey(lockKeys);
  7. request.setResourceId(resourceId);
  8. request.setBranchType(branchType);
  9. request.setApplicationData(applicationData);
  10. /**
  11. * TC 中,负责注册分支的是 AbstractCore 的 branchRegister 方法,其中最重要的一步就是获取该分支事务的全局锁。
  12. */
  13. BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
  14. if (response.getResultCode() == ResultCode.Failed) {
  15. throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
  16. }
  17. return response.getBranchId();
  18. } catch (TimeoutException toe) {
  19. throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
  20. } catch (RuntimeException rex) {
  21. throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
  22. }
  23. }

20、点击 sendSyncRequest 方法 进入 AbstractNettyRemotingClient

  1. @Override
  2. public Object sendSyncRequest(Object msg) throws TimeoutException {
  3. //通过事务组,负载选择一个服务端实例
  4. String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
  5. int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();//超时时间设置
  6. RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
  7. // send batch message
  8. // put message into basketMap, @see MergedSendRunnable
  9. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
  10. // send batch message is sync request, needs to create messageFuture and put it in futures.
  11. MessageFuture messageFuture = new MessageFuture();
  12. messageFuture.setRequestMessage(rpcMessage);
  13. messageFuture.setTimeout(timeoutMillis);
  14. futures.put(rpcMessage.getId(), messageFuture);
  15. // put message into basketMap
  16. BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
  17. key -> new LinkedBlockingQueue<>());
  18. if (!basket.offer(rpcMessage)) {
  19. LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
  20. serverAddress, rpcMessage);
  21. return null;
  22. }
  23. if (LOGGER.isDebugEnabled()) {
  24. LOGGER.debug("offer message: {}", rpcMessage.getBody());
  25. }
  26. if (!isSending) {
  27. synchronized (mergeLock) {
  28. mergeLock.notifyAll();
  29. }
  30. }
  31. try {
  32. return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  33. } catch (Exception exx) {
  34. LOGGER.error("wait response error:{},ip:{},request:{}",
  35. exx.getMessage(), serverAddress, rpcMessage.getBody());
  36. if (exx instanceof TimeoutException) {
  37. throw (TimeoutException) exx;
  38. } else {
  39. throw new RuntimeException(exx);
  40. }
  41. }
  42. } else {
  43. Channel channel = clientChannelManager.acquireChannel(serverAddress);
  44. //正式继续调用
  45. return super.sendSync(channel, rpcMessage, timeoutMillis);
  46. }
  47. }

21、点击 super.sendSync(channel, rpcMessage, timeoutMillis)进入 AbstractNettyRemoting

  1. protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
  2. if (timeoutMillis <= 0) {
  3. throw new FrameworkException("timeout should more than 0ms");
  4. }
  5. if (channel == null) {
  6. LOGGER.warn("sendSync nothing, caused by null channel.");
  7. return null;
  8. }
  9. MessageFuture messageFuture = new MessageFuture();
  10. messageFuture.setRequestMessage(rpcMessage);
  11. messageFuture.setTimeout(timeoutMillis);
  12. futures.put(rpcMessage.getId(), messageFuture);
  13. channelWritableCheck(channel, rpcMessage.getBody());
  14. String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
  15. doBeforeRpcHooks(remoteAddr, rpcMessage);
  16. //正式通过netty的writeAndFlush 标准接口调用,并监听
  17. channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  18. if (!future.isSuccess()) {
  19. MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
  20. if (messageFuture1 != null) {
  21. messageFuture1.setResultMessage(future.cause());
  22. }
  23. destroyChannel(future.channel());
  24. }
  25. });
  26. try {//异步获取调用结果
  27. Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  28. doAfterRpcHooks(remoteAddr, rpcMessage, result);
  29. return result;
  30. } catch (Exception exx) {
  31. LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
  32. rpcMessage.getBody());
  33. if (exx instanceof TimeoutException) {
  34. throw (TimeoutException) exx;
  35. } else {
  36. throw new RuntimeException(exx);
  37. }
  38. }
  39. }

到这里客户端流程结束,下游就是TC端接受信息了,下篇我们会详细分享。 

22、上游 存日志操作 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);

进入 AbstractUndoLogManager implements UndoLogManager  类

  1. @Override
  2. public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
  3. ConnectionContext connectionContext = cp.getContext();
  4. if (!connectionContext.hasUndoLog()) {
  5. return;
  6. }
  7. String xid = connectionContext.getXid();
  8. long branchId = connectionContext.getBranchId();
  9. BranchUndoLog branchUndoLog = new BranchUndoLog();
  10. branchUndoLog.setXid(xid);
  11. branchUndoLog.setBranchId(branchId);
  12. branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
  13. UndoLogParser parser = UndoLogParserFactory.getInstance();
  14. byte[] undoLogContent = parser.encode(branchUndoLog);
  15. CompressorType compressorType = CompressorType.NONE;
  16. if (needCompress(undoLogContent)) {
  17. compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
  18. undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
  19. }
  20. if (LOGGER.isDebugEnabled()) {
  21. LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
  22. }
  23. // 写入数据库具体位置,点击进入
  24. insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
  25. }

 

23、点击 insertUndoLogWithNormal 进入

我们用的mysql数据库,因此进入这里 MySQLUndoLogManager extends AbstractUndoLogManager

 

  1. @Override
  2. protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
  3. Connection conn) throws SQLException {
  4. insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);//进入
  5. }

到了原始数据执行

  1. private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
  2. State state, Connection conn) throws SQLException {
  3. try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
  4. pst.setLong(1, branchId);
  5. pst.setString(2, xid);
  6. pst.setString(3, rollbackCtx);
  7. pst.setBytes(4, undoLogContent);
  8. pst.setInt(5, state.getValue());
  9. pst.executeUpdate();//反sql添加,
  10. } catch (Exception e) {
  11. if (!(e instanceof SQLException)) {
  12. e = new SQLException(e);
  13. }
  14. throw (SQLException) e;
  15. }
  16. }

 24、上游 report(true);//此处成功后和TC 通讯,报告分支事务的执行成功的结果,和其他的TC通信类似

  1. private void report(boolean commitDone) throws SQLException {
  2. if (context.getBranchId() == null) {
  3. return;
  4. }
  5. int retry = REPORT_RETRY_COUNT;
  6. while (retry > 0) {
  7. try {
  8. //开始调用,到此,流程类似
  9. DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
  10. commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
  11. return;
  12. } catch (Throwable ex) {
  13. LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
  14. + commitDone + "] Retry Countdown: " + retry);
  15. retry--;
  16. if (retry == 0) {
  17. throw new SQLException("Failed to report branch status " + commitDone, ex);
  18. }
  19. }
  20. }
  21. }

25、返回上游 return executeAutoCommitFalse(args); 如果没有拿到操作数据库的指令或者拿到操作指令时也有这一步操作:

  1. /**
  2. * Execute auto commit false t.
  3. *
  4. * @param args the args
  5. * @return the t
  6. * @throws Exception the exception
  7. * 不自动提交执行,会在SQL 实际执行前后,构建镜像,记录数据状态,比如更新语句,会记录该条数据修改前和修改后的数据状态,并添加排他锁
  8. */
  9. protected T executeAutoCommitFalse(Object[] args) throws Exception {
  10. if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
  11. throw new NotSupportYetException("multi pk only support mysql!");
  12. }
  13. //解析业务 sql,根据业务 sql 的条件拼凑 select 语句,查询执行业务 sql 之前的结果,会根据这个结果来做二阶段回滚,即此处加行锁,本地锁
  14. //可以进入 UpdateExecutor
  15. TableRecords beforeImage = beforeImage();//前置镜像数据,进入 UpdateExecutor 子类
  16. //执行业务sql
  17. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  18. //根据 beforeImage 的结果集的注解 ID 拼凑 select 语句,以这个主键 id 作为条件查询
  19. TableRecords afterImage = afterImage(beforeImage);//后置镜像数据,可以进入更新 MultiUpdateExecutor
  20. //把 beforeImage 和 afterImage 包装到一个对象中,并装入 list 中,后续插入undo_log 表
  21. prepareUndoLog(beforeImage, afterImage);
  22. return result;
  23. }

点击   beforeImage() 进入 UpdateExecutor 类

  1. @Override
  2. protected TableRecords beforeImage() throws SQLException {
  3. ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  4. TableMeta tmeta = getTableMeta();//拿到一些表的信息
  5. String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);//拼凑sql,同时加行锁,进入
  6. return buildTableRecords(tmeta, selectSQL, paramAppenderList);//操作数据库的核心代码
  7. }

点击 getTableMeta();//拿到一些表的信息,

BaseTransactionalExecutor<T, S extends Statement> implements Executor<T>
  1. protected TableMeta getTableMeta() {
  2. return getTableMeta(sqlRecognizer.getTableName());//继续进入
  3. }

进入

  1. protected TableMeta getTableMeta(String tableName) {
  2. if (tableMeta != null) {
  3. return tableMeta;
  4. }
  5. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  6. //通过数据源代理,获取表的数据.....
  7. tableMeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType())
  8. .getTableMeta(connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId());
  9. return tableMeta;
  10. }

 点击 getTableMeta( 进入 

  1. @Override
  2. public TableMeta getTableMeta(final Connection connection, final String tableName, String resourceId) {
  3. if (StringUtils.isNullOrEmpty(tableName)) {
  4. throw new IllegalArgumentException("TableMeta cannot be fetched without tableName");
  5. }
  6. TableMeta tmeta;
  7. final String key = getCacheKey(connection, tableName, resourceId);
  8. tmeta = TABLE_META_CACHE.get(key, mappingFunction -> {
  9. try {
  10. return fetchSchema(connection, tableName);//获取数据库资源
  11. } catch (SQLException e) {
  12. LOGGER.error("get table meta of the table `{}` error: {}", tableName, e.getMessage(), e);
  13. return null;
  14. }
  15. });
  16. if (tmeta == null) {
  17. throw new ShouldNeverHappenException(String.format("[xid:%s]get table meta failed," +
  18. " please check whether the table `%s` exists.", RootContext.getXID(), tableName));
  19. }
  20. return tmeta;
  21. }

点击 fetchSchema(connection, tableName);//获取数据库资源 ,进入MysqlTableMetaCache

  1. @Override
  2. protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
  3. String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
  4. try (Statement stmt = connection.createStatement();//创建获取声明的代理
  5. ResultSet rs = stmt.executeQuery(sql)) {
  6. return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
  7. } catch (SQLException sqlEx) {
  8. throw sqlEx;
  9. } catch (Exception e) {
  10. throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
  11. }
  12. }

点击buildBeforeImageSQL(tmeta, paramAppenderList);悲观锁在这里添加

  1. private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
  2. SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
  3. List<String> updateColumns = recognizer.getUpdateColumns();
  4. StringBuilder prefix = new StringBuilder("SELECT ");
  5. StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
  6. String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
  7. if (StringUtils.isNotBlank(whereCondition)) {
  8. suffix.append(WHERE).append(whereCondition);
  9. }
  10. String orderBy = recognizer.getOrderBy();
  11. if (StringUtils.isNotBlank(orderBy)) {
  12. suffix.append(orderBy);
  13. }
  14. ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
  15. String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
  16. if (StringUtils.isNotBlank(limit)) {
  17. suffix.append(limit);
  18. }
  19. suffix.append(" FOR UPDATE");//添加悲观锁
  20. StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
  21. if (ONLY_CARE_UPDATE_COLUMNS) {
  22. if (!containsPK(updateColumns)) {
  23. selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
  24. }
  25. for (String columnName : updateColumns) {
  26. selectSQLJoin.add(columnName);
  27. }
  28. } else {
  29. for (String columnName : tableMeta.getAllColumns().keySet()) {
  30. selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
  31. }
  32. }
  33. return selectSQLJoin.toString();
  34. }

点击 buildTableRecords(

  1. protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL, ArrayList<List<Object>> paramAppenderList) throws SQLException {
  2. ResultSet rs = null;
  3. try (PreparedStatement ps = statementProxy.getConnection().prepareStatement(selectSQL)) {//点击进入
  4. if (CollectionUtils.isNotEmpty(paramAppenderList)) {
  5. for (int i = 0, ts = paramAppenderList.size(); i < ts; i++) {
  6. List<Object> paramAppender = paramAppenderList.get(i);
  7. for (int j = 0, ds = paramAppender.size(); j < ds; j++) {
  8. ps.setObject(i * ds + j + 1, paramAppender.get(j));
  9. }
  10. }
  11. }
  12. rs = ps.executeQuery();
  13. return TableRecords.buildRecords(tableMeta, rs);
  14. } finally {
  15. IOUtil.close(rs);
  16. }
  17. }

26、点击  getConnection().prepareStatement(selectSQL)) {//点击进入 

AbstractConnectionProxy implements Connection
  1. @Override
  2. public PreparedStatement prepareStatement(String sql) throws SQLException {
  3. String dbType = getDbType();//数据库类型,比如mysql、oracle等
  4. // support oracle 10.2+
  5. PreparedStatement targetPreparedStatement = null;
  6. if (BranchType.AT == RootContext.getBranchType()) { //如果是AT模式且开启全局事务,那么就会进入if分支
  7. List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
  8. if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
  9. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  10. if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
  11. //得到表的元数据
  12. TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
  13. sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
  14. //得到表的主键列名
  15. String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
  16. tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
  17. targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
  18. }
  19. }
  20. }
  21. if (targetPreparedStatement == null) {
  22. targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  23. }// 创建PreparedStatementProxy代理
  24. return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  25. }

27、afterImage(beforeImage);流程同上;点击  prepareUndoLog(beforeImage, afterImage);

  1. protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
  2. // 前后镜像都为空,执行返回
  3. if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
  4. return;
  5. }
  6. // UPDATE 时,没有前后镜像,抛出异常
  7. if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
  8. if (beforeImage.getRows().size() != afterImage.getRows().size()) {
  9. throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
  10. }
  11. }
  12. ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  13. // 如果是删除,则只记录删除前的数据
  14. TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
  15. // 构建全局锁的key =>account_tbl:11111111(表名+主键 )
  16. String lockKeys = buildLockKey(lockKeyRecords);
  17. if (null != lockKeys) {
  18. // 添加全局锁的KEY  和滚回日志到 SQL 连接中
  19. connectionProxy.appendLockKey(lockKeys);
  20. SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
  21. connectionProxy.appendUndoLog(sqlUndoLog);
  22. }
  23. }

28、TC端介绍开始:

TC的业务内部类channelHandler为类 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler

到此、客户端数据源初始化和执行的源码流程分析完成,下篇我们分析服务端接收的源码,敬请期待!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/372223
推荐阅读
相关标签
  

闽ICP备14008679号