当前位置:   article > 正文

分布式事务-Seata_io.seata.config.fileconfiguration : filelistener e

io.seata.config.fileconfiguration : filelistener execute error, dataid :serv

1. 介绍

1.1 什么是Seata

Seata 是阿里开源的分布式事务框架,属于二阶段提交模式,为用户提供了AT、TCC、SAGA 和 XA 事务模式。 

1.2 如何实现

在微服务架构中,以下3个模块会变为3个独立的微服务,各自有自己的数据源,Seata 调用逻辑就变为:

 

「推荐」阿里开源的分布式事务框架 Seata

Business 是业务入口,在程序中会通过注解来说明他是一个全局事务,这时他的角色为 TM(事务管理者)。

  • Business 会请求 TC(事务协调器,一个独立运行的服务),说明自己要开启一个全局事务,TC 会生成一个全局事务ID(XID),并返回给 Business。
  • Business 得到 XID 后,开始调用微服务,例如调用 Storage。
  • Storage 会收到 XID,知道自己的事务属于这个全局事务。Storage 执行自己的业务逻辑,操作本地数据库。
  • Storage 会把自己的事务注册到 TC,作为这个 XID 下面的一个分支事务,并且把自己的事务执行结果也告诉 TC。
  • 此时 Storage 的角色是 RM(资源管理者),资源是指本地数据库。
  • Order、Account 的执行逻辑与 Storage 一致。
  • 在各个微服务都执行完成后,TC 可以知道 XID 下各个分支事务的执行结果,TM(Business) 也就知道了。
  • Business 如果发现各个微服务的本地事务都执行成功了,就请求 TC 对这个 XID 提交,否则回滚。
  • TC 收到请求后,向 XID 下的所有分支事务发起相应请求。
  • 各个微服务收到 TC 的请求后,执行相应指令,并把执行结果上报 TC。

1.3 重要机制

(1)全局事务的回滚是如何实现的呢?

Seata 有一个重要的机制:回滚日志。每个分支事务对应的数据库中都需要有一个回滚日志表 UNDO_LOG,在真正修改数据库记录之前,都会先记录修改前的记录值,以便之后回滚。在收到回滚请求后,就会根据 UNDO_LOG 生成回滚操作的 SQL 语句来执行。如果收到的是提交请求,就把 UNDO_LOG 中的相应记录删除掉。

(2)RM 是怎么自动和 TC 交互的?

是通过监控拦截JDBC实现的,例如监控到开启本地事务了,就会自动向 TC 注册、生成回滚日志、向 TC 汇报执行结果。

(3)二阶段回滚失败怎么办?

例如 TC 命令各个 RM 回滚的时候,有一个微服务挂掉了,那么所有正常的微服务也都不会执行回滚,当这个微服务重新正常运行后,TC 会重新执行全局回滚。

1.4 核心组件

  • 事务协调器 TC:维护全局和分支事务的状态,指示全局提交或者回滚(Seata服务端)。
  • 事务管理者 TM:开启、提交或者回滚一个全局事务(事务牵头,谁需要执行全局事务,谁就是TM)。
  • 资源管理者 RM:管理执行分支事务的那些资源,向TC注册分支事务、上报分支事务状态、控制分支事务的提交或者回滚。
  • 注册中心:客户端注册中心(位于各个分布式项目中的registry.conf配置文件中的registry.type参数)其实指的是在哪里发现seata-server,因为为了支持HA高可用seata-server可能是集群的,比如交由zookeeper管理,那么客户端只需向zk去发现seata-server即可。如果仅适用单一的seata-server,无需HA高可用的支持,可以使用file作为注册中心类型。
  • 配置中心客户端配置和服务端配置,就是管理服务端和客户端关于seata的相关的参数设置,如果无需统一管理配置使用file即可,不会影响高可用,但最好也使用一个第三方配置中心 。

(1)注意

  • 不推荐registry.type=file:因为当registry.type=file时,说明这里用的不是真正的注册中心,不具备集群内服务的健康检查机制当tc(seata-server)不可用时无法自动剔除列表,推荐使用nacos 、eureka、redis、zk、consul、etcd3、sofa。registry.type=file或config.type=file 设计的初衷是让用户再不依赖第三方注册中心或配置中心的前提下,通过直连的方式,快速验证seata服务。
  • HA高可用说明如果使用了seata-server集群为了保证数据 的一致性,服务端的配置参数store.mode就不能使用file类型了,不然会报错。需要使用db类型,将集群中所有的seata-server的数据存储位置指向同一个DB或DB集群。如果DB使用mysql时使用过高版本可能会出现一些问题,之前使用mysql8.0作为db存储遇到些问题,降到5.x就好了

1.5 如何证明Seata满足事务的ACID特性

TBD

2. 源码

2.1 Seata服务端启动

从官网下载Seata服务端后,conf目录下会有两个配置文件(file.conf和registry.conf),file.conf应该是存储信息用的,registry.conf代表启动的时候向哪个注册中心注册,以便后续健康检查用的。默认都是file形式。

启动命令(sh seata-server.sh -p 8091 -h 127.0.0.1 -m file),fescar版本的服务端不能指定域名信息。

2.2 客户端初始化

SeataAutoConfiguration作为配置入口,完成初始化工作。初始化的时候,会开启很多定时任务或者线程池,有的会隐藏在类实例化内(难找)。

  1. // io.seata.spring.annotation.GlobalTransactionScanner#initClient
  2. private void initClient() {
  3. // ...
  4. //TM初始化
  5. TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
  6. // ...
  7. //RM初始化
  8. RMClient.init(applicationId, txServiceGroup);
  9. // ...
  10. }

2.2.1 根据配置中心类型,获取Seata服务端地址信息

不论Seata配置的配置中心类型是什么,优先从系统配置获取值信息(System.getProperty)。也就是说如果项目本身就集成例如nacos的,理论上不会触发seata配置的nacos地址去读取需要的配置信息。

  • file
  1. // io.seata.config.FileConfiguration#getLatestConfig
  2. public String getLatestConfig(String dataId, String defaultValue, long timeoutMills) {
  3. // 获取系统配置(key为service.vgroupMapping.XXXX-seata-service-group)
  4. String value = getConfigFromSysPro(dataId);
  5. if (value != null) {
  6. return value;
  7. }
  8. // 异步获取本地文件配置
  9. ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigOperation.GET, timeoutMills);
  10. configOperateExecutor.submit(new ConfigOperateRunnable(configFuture));
  11. Object getValue = configFuture.get();
  12. return getValue == null ? null : String.valueOf(getValue);
  13. }
  • 省略

2.2.2 根据注册中心类型,检测Seata服务是否可用

根据配置的注册中心类型,走不同的RegistryProvider实现。所有获取Seata服务端地址的时候,会从配置中拉取配置的地址信息,再根据注册中心类型进行健康检查,剔除一些不可用的Seata服务端,保证Seata服务端的高可用。

(1)ServiceLoader加载器加载(这个会加载指定接口的实现)

  1. // io.seata.discovery.registry.RegistryFactory#buildRegistryService
  2. private static RegistryService buildRegistryService() {
  3. RegistryType registryType;
  4. String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
  5. ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
  6. + ConfigurationKeys.FILE_ROOT_TYPE);
  7. try {
  8. registryType = RegistryType.getType(registryTypeName);
  9. } catch (Exception exx) {
  10. throw new NotSupportYetException("not support registry type: " + registryTypeName);
  11. }
  12. if (RegistryType.File == registryType) {
  13. // 文件类型直接返回实现
  14. return FileRegistryServiceImpl.getInstance();
  15. } else {
  16. return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
  17. }
  18. }

(2) 检测Seata服务是否可用

以Zk为例。

  1. // io.seata.discovery.registry.zk.ZookeeperRegisterServiceImpl#lookup
  2. public List<InetSocketAddress> lookup(String key) throws Exception {
  3. // 获取配置的Seata服务地址
  4. String clusterName = getServiceGroup(key);
  5. if (clusterName == null) {
  6. return null;
  7. }
  8. // 根据不同的注册中心类型检测服务是否可用
  9. return doLookup(clusterName);
  10. }
  11. // io.seata.discovery.registry.zk.ZookeeperRegisterServiceImpl#doLookup
  12. List<InetSocketAddress> doLookup(String clusterName) throws Exception {
  13. boolean exist = getClientInstance().exists(ROOT_PATH + clusterName);
  14. if (!exist) {
  15. return null;
  16. }
  17. if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
  18. List<String> childClusterPath = getClientInstance().getChildren(ROOT_PATH + clusterName);
  19. refreshClusterAddressMap(clusterName, childClusterPath);
  20. subscribeCluster(clusterName);
  21. }
  22. return CLUSTER_ADDRESS_MAP.get(clusterName);
  23. }

2.3 事务执行过程

2.3.1 GlobalTransactional注解开启全局事务

  1. @GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
  2. @RequestMapping(value = "/order", method = RequestMethod.POST, produces = "application/json")
  3. public String order(String userId, String commodityCode, int orderCount) {
  4. // ...
  5. }

2.3.2 GlobalTransactiona注解AOP实现

(1)GlobalTransactionScanner继承AbstractAutoProxyCreator,实现全局事务拦截器的加载(AOP原理)

  1. // io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary
  2. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  3. try {
  4. synchronized (PROXYED_SET) {
  5. if (PROXYED_SET.contains(beanName)) {
  6. return bean;
  7. }
  8. interceptor = null;
  9. //check TCC proxy
  10. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  11. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  12. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  13. ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  14. (ConfigurationChangeListener)interceptor);
  15. } else {
  16. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  17. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  18. if (!existsAnnotation(new Class[]{serviceInterface})
  19. && !existsAnnotation(interfacesIfJdk)) {
  20. return bean;
  21. }
  22. if (interceptor == null) {
  23. if (globalTransactionalInterceptor == null) {
  24. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  25. ConfigurationCache.addConfigListener(
  26. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  27. (ConfigurationChangeListener)globalTransactionalInterceptor);
  28. }
  29. interceptor = globalTransactionalInterceptor;
  30. }
  31. }
  32. LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  33. if (!AopUtils.isAopProxy(bean)) {
  34. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  35. } else {
  36. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  37. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  38. for (Advisor avr : advisor) {
  39. advised.addAdvisor(0, avr);
  40. }
  41. }
  42. PROXYED_SET.add(beanName);
  43. return bean;
  44. }
  45. } catch (Exception exx) {
  46. throw new RuntimeException(exx);
  47. }
  48. }

2.3.3 @GlobalTransactional注解AOP拦截

  1. // io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
  2. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  3. Class<?> targetClass =
  4. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
  5. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  6. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
  7. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  8. final GlobalTransactional globalTransactionalAnnotation =
  9. getAnnotation(method, targetClass, GlobalTransactional.class);
  10. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
  11. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
  12. if (!localDisable) {
  13. if (globalTransactionalAnnotation != null) {
  14. // 处理全局事务
  15. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
  16. } else if (globalLockAnnotation != null) {
  17. return handleGlobalLock(methodInvocation, globalLockAnnotation);
  18. }
  19. }
  20. }
  21. return methodInvocation.proceed();
  22. }

 2.3.4 TM向Seata服务端申请XID

XID是Seata分布式事务的一个全局上下文,不论各个项目通过何种方式调用(Dubbo、ACM、SringCloud、Http等等),都需要投传这个XID,例如Dubbo可以用拦截器拦截进行透传绑定;本地为了方便可以直接利用Http透传给下个业务系统进行绑定。

 (1)TM申请XID

  1. // io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)
  2. public Object sendSyncRequest(Object msg) throws TimeoutException {
  3. // 获取可用的Seata服务端地址
  4. String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
  5. int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
  6. RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
  7. // 发送异步请求,获取XID信息
  8. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
  9. // send batch message is sync request, needs to create messageFuture and put it in futures.
  10. MessageFuture messageFuture = new MessageFuture();
  11. messageFuture.setRequestMessage(rpcMessage);
  12. messageFuture.setTimeout(timeoutMillis);
  13. futures.put(rpcMessage.getId(), messageFuture);
  14. // put message into basketMap
  15. BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
  16. key -> new LinkedBlockingQueue<>());
  17. if (!basket.offer(rpcMessage)) {
  18. LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
  19. serverAddress, rpcMessage);
  20. return null;
  21. }
  22. if (LOGGER.isDebugEnabled()) {
  23. LOGGER.debug("offer message: {}", rpcMessage.getBody());
  24. }
  25. if (!isSending) {
  26. synchronized (mergeLock) {
  27. mergeLock.notifyAll();
  28. }
  29. }
  30. try {
  31. return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  32. } catch (Exception exx) {
  33. LOGGER.error("wait response error:{},ip:{},request:{}",
  34. exx.getMessage(), serverAddress, rpcMessage.getBody());
  35. if (exx instanceof TimeoutException) {
  36. throw (TimeoutException) exx;
  37. } else {
  38. throw new RuntimeException(exx);
  39. }
  40. }
  41. } else {
  42. Channel channel = clientChannelManager.acquireChannel(serverAddress);
  43. return super.sendSync(channel, rpcMessage, timeoutMillis);
  44. }
  45. }

 (2)下游系统绑定上游系统的XID信息

  1. @RequestMapping(value = "/account", method = RequestMethod.POST, produces = "application/json")
  2. public String account(String userId, int money, String xid) {
  3. // 拿到上游系统的xid后,需要进行绑定xid,不然谁知道你是啥
  4. RootContext.bind(xid);
  5. LOGGER.info("Account Service ... xid: " + RootContext.getXID());
  6. int result = jdbcTemplate.update(
  7. "update account_tbl set money = money - ? where user_id = ?",
  8. new Object[] { money, userId });
  9. LOGGER.info("Account Service End ... ");
  10. if (result == 1) {
  11. return SUCCESS;
  12. }
  13. return FAIL;
  14. }

 

2.3.5 正常业务操作

  1. // io.seata.tm.api.TransactionalTemplate#execute
  2. public Object execute(TransactionalExecutor business) throws Throwable {
  3. // ...
  4. try {
  5. // 开启全局事务,主要就是绑定xid
  6. beginTransaction(txInfo, tx);
  7. Object rs;
  8. try {
  9. // 业务内容执行
  10. rs = business.execute();
  11. } catch (Throwable ex) {
  12. // 如果是指定错误类型(可扩展),就回滚,否则就提交
  13. completeTransactionAfterThrowing(txInfo, tx, ex);
  14. throw ex;
  15. }
  16. // 通知TC提交事务
  17. commitTransaction(tx);
  18. return rs;
  19. } finally {
  20. //5. clear
  21. resumeGlobalLockConfig(previousConfig);
  22. triggerAfterCompletion();
  23. cleanUp();
  24. }
  25. } finally {
  26. // If the transaction is suspended, resume it.
  27. if (suspendedResourcesHolder != null) {
  28. tx.resume(suspendedResourcesHolder);
  29. }
  30. }
  31. }

2.3.6 DataSourceProxy对数据库SQL进行拦截

本地Mysql事务开启的时候需要用Seata数据库代理类进行拦截处理。

  1. public JdbcTemplate jdbcTemplate(DruidDataSource druidDataSource) {
  2. // 使用Seata数据库代理类
  3. DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
  4. JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSourceProxy);
  5. // ...
  6. return jdbcTemplate;
  7. }

2.3.7 执行自定义SQL

  1. // org.springframework.jdbc.core.JdbcTemplate#execute(org.springframework.jdbc.core.PreparedStatementCreator, org.springframework.jdbc.core.PreparedStatementCallback<T>)
  2. public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException {
  3. // ...
  4. try {
  5. // 获取Sql预处理表达式,AbstractConnectionProxy会进行拦截处理
  6. ps = psc.createPreparedStatement(con);
  7. this.applyStatementSettings(ps);
  8. T result = action.doInPreparedStatement(ps);
  9. this.handleWarnings((Statement)ps);
  10. var13 = result;
  11. } catch (SQLException var10) {
  12. // ...
  13. } finally {
  14. // ...
  15. }
  16. return var13;
  17. }

 2.3.8 预处理SQL

这里似乎主要是针对AT模式下,Undo_log的插入语句进行拦截预处理。

  1. // io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)
  2. public PreparedStatement prepareStatement(String sql) throws SQLException {
  3. String dbType = getDbType();
  4. // support oracle 10.2+
  5. PreparedStatement targetPreparedStatement = null;
  6. if (BranchType.AT == RootContext.getBranchType()) {
  7. List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
  8. if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
  9. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  10. if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
  11. // 这里主要识别Undo_log的sql语句,但是其本身是插入语句的话没试过
  12. TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
  13. sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
  14. String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
  15. tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
  16. targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
  17. }
  18. }
  19. }
  20. if (targetPreparedStatement == null) {
  21. targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  22. }
  23. return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  24. }

2.3.9 全局事务提交

  1. // io.seata.rm.datasource.ConnectionProxy#doCommit
  2. private void doCommit() throws SQLException {
  3. if (context.inGlobalTransaction()) {
  4. // 如果上下文中能拿到xid,进来(如果没绑定xid,那么就不会当作分布式事务处理)
  5. processGlobalTransactionCommit();
  6. } else if (context.isGlobalLockRequire()) {
  7. processLocalCommitWithGlobalLocks();
  8. } else {
  9. targetConnection.commit();
  10. }
  11. }
  1. // io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
  2. private void processGlobalTransactionCommit() throws SQLException {
  3. try {
  4. // 向TC注册分支ID(branchId)
  5. register();
  6. } catch (TransactionException e) {
  7. recognizeLockKeyConflictException(e, context.buildLockKeys());
  8. }
  9. try {
  10. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
  11. targetConnection.commit();
  12. } catch (Throwable ex) {
  13. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
  14. // 事务异常后向TC,也就是Seata服务端汇报失败
  15. report(false);
  16. throw new SQLException(ex);
  17. }
  18. if (IS_REPORT_SUCCESS_ENABLE) {
  19. // 事务异常后向TC,也就是Seata服务端汇报成功
  20. report(true);
  21. }
  22. context.reset();
  23. }

(1)插入undo_log的sql

  1. // io.seata.rm.datasource.undo.AbstractUndoLogManager#flushUndoLogs
  2. public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
  3. // ...
  4. BranchUndoLog branchUndoLog = new BranchUndoLog();
  5. branchUndoLog.setXid(xid);
  6. branchUndoLog.setBranchId(branchId);
  7. branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
  8. UndoLogParser parser = UndoLogParserFactory.getInstance();
  9. // 这里将本次执行的sql信息转换成二进制
  10. byte[] undoLogContent = parser.encode(branchUndoLog);
  11. // ...
  12. insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
  13. cp.getTargetConnection());
  14. }

2.3.10 RM接收TC发送的全局事务的成功或者失败信息

 (1)RM初始化的时候会启动clientBootstrap

NettyClientBootstrap内部包装了netty的Bootstrap,其中会向Bootstrap绑定一个ClientHandler用来处理seata server返回来的消息,最终通过Bootstrap建立与server端的连接。

  1. // io.seata.core.rpc.netty.AbstractNettyRemotingClient#init
  2. public void init() {
  3. // ...
  4. clientBootstrap.start();
  5. }

 (2)RM客户端接收TC消息

  1. // io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler#channelRead
  2. public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (!(msg instanceof RpcMessage)) {
  4. return;
  5. }
  6. processMessage(ctx, (RpcMessage) msg);
  7. }

(3)处理TC消息

根据TC返回的消息,例如分支提交、回滚、删除Undo_log信息等等。

  1. //
  2. protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  3. // ...
  4. Object body = rpcMessage.getBody();
  5. if (body instanceof MessageTypeAware) {
  6. MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  7. final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  8. if (pair != null) {
  9. if (pair.getSecond() != null) {
  10. try {
  11. pair.getSecond().execute(() -> {
  12. try {
  13. // 根据不同的消息类型执行不同的处理器
  14. pair.getFirst().process(ctx, rpcMessage);
  15. } catch (Throwable th) {
  16. LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  17. }
  18. });
  19. } catch (RejectedExecutionException e) {
  20. LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
  21. "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
  22. if (allowDumpStack) {
  23. String name = ManagementFactory.getRuntimeMXBean().getName();
  24. String pid = name.split("@")[0];
  25. int idx = new Random().nextInt(100);
  26. try {
  27. Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
  28. } catch (IOException exx) {
  29. LOGGER.error(exx.getMessage());
  30. }
  31. allowDumpStack = false;
  32. }
  33. }
  34. } else {
  35. try {
  36. pair.getFirst().process(ctx, rpcMessage);
  37. } catch (Throwable th) {
  38. LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  39. }
  40. }
  41. } else {
  42. LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  43. }
  44. } else {
  45. LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  46. }
  47. }

3. 实战

参照【springboot集成分布式事务Seata

4. FAQ

4.1 为什么Seata在第一阶段就直接提交了分支事务

Seata能够在第一阶段直接提交事务,是因为Seata框架为每一个RM维护了一张UNDO_LOG表(这张表需要客户端自行创建),其中保存了每一次本地事务的回滚数据。因此,二阶段的回滚并不依赖于本地数据库事务的回滚,而是RM直接读取这张UNDO_LOG表,并将数据库中的数据更新为UNDO_LOG中存储的历史数据。如果第二阶段是提交命令,那么RM事实上并不会对数据进行提交(因为一阶段已经提交了),而实发起一个异步请求删除UNDO_LOG中关于本事务的记录。

4.2 使用fescar遇到can not register RM,err:can not connect to fescar-server

SpringBoot项目集成的是seata-spring-boot-starter,结果用的服务端是Fescar。这两个还是有区别的(Seata是Fescar改名后的产物),如果项目中集成seata-spring-boot-starter,就去官网下载seata服务端。网上说是因为内网IP的问题,涉及到改Fescar服务端源码。Fescar服务端不能指定IP地址,Seata可以指定IP地址(sh seata-server.sh -p 8091 -h 127.0.0.1 -m file)。

4.3 Netty相关操作

4.4 AT、TCC、SAGA 和 XA相关概念

4.5 分库分表情况下,分布式事务是如何操作

5. 参考资料

springboot集成分布式事务Seata

Seata中文官网

阿里开源的分布式事务框架 Seata

MySQL 5.7中对XA支持的改进

阿里分布式事务框架Seata原理解析

seata源码解析系列-AT模式

Seata实战-AT模式分布式事务原理、源码分析

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/274863
推荐阅读
相关标签
  

闽ICP备14008679号