当前位置:   article > 正文

分布式事务seata之AT模式原理分析及实战_seata at模式详解及实际应用

seata at模式详解及实际应用

seata是干什么的?

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。本文主要介绍seata中AT模式的原理以及使用方式。

seata中关键角色

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

整体机制

seata是对两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志(undo_log)记录在同一个本地事务中提交,释放本地锁和连接资源。

  • 二阶段:

    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

运行流程示意图如下(假设我们的业务系统有四个服务,现在模拟一次购物行为,business:聚合业务层,account:扣费,storage:减库存,order:存订单):

运行流程

  1. 在business模块中使用@GlobalTransactional注解,TM向TC发起全局事物,生成XID(全局锁),分别调用account、storage、order模块;
  2. 在account模块中进行扣费逻辑处理,这时进行写表操作,undo_log记录回滚日志,通知TC操作结果;
  3. 在storage模块中进行减库存逻辑处理,这时进行写表操作,undo_log记录回滚日志,通知TC操作结果;
  4. 在order模块中进行保存订单逻辑处理,这时进行写表操作,undo_log记录回滚日志,通知TC操作结果;
  5. 正常情况:business调用其他模块全部成功,TM通知TC全部提交,TC通知所有RM提交成功,删除本地undo_log;
  6. 异常情况:business调用其他模块出现异常,TM通知TC全局rollback,TC通知所有RM进行回滚,根据undo_log进行反向操作,还原数据,最后删除undo_log;

实战操作

  1. 环境说明

    macOS Mojave10.14.6
    spring-cloud-alibaba2.2.1.RELEASE

    spring-cloud-starter-alibaba-seata

    2.2.1.RELEASE
    seata-spring-boot-starter1.4.0
    seata1.4.0
    nacos1.3.1
    mysql5.7.17-log
  2. 搭建部署TC(seata-server)
    修改配置
    1. store {
    2. ## store mode: file、db、redis
    3. mode = "db"
    4. ## file store property
    5. file {
    6. ## store location dir
    7. dir = "sessionStore"
    8. # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    9. maxBranchSessionSize = 16384
    10. # globe session size , if exceeded throws exceptions
    11. maxGlobalSessionSize = 512
    12. # file buffer size , if exceeded allocate new buffer
    13. fileWriteBufferCacheSize = 16384
    14. # when recover batch read size
    15. sessionReloadReadSize = 100
    16. # async, sync
    17. flushDiskMode = async
    18. }
    19. ## database store property
    20. db {
    21. ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    22. datasource = "druid"
    23. ## mysql/oracle/postgresql/h2/oceanbase etc.
    24. dbType = "mysql"
    25. driverClassName = "com.mysql.jdbc.Driver"
    26. url = "jdbc:mysql://127.0.0.1:3306/seata"
    27. user = "root"
    28. password = "123456"
    29. minConn = 5
    30. maxConn = 100
    31. globalTable = "global_table"
    32. branchTable = "branch_table"
    33. lockTable = "lock_table"
    34. queryLimit = 100
    35. maxWait = 5000
    36. }
    37. ## redis store property
    38. redis {
    39. host = "127.0.0.1"
    40. port = "6379"
    41. password = ""
    42. database = "0"
    43. minConn = 1
    44. maxConn = 10
    45. maxTotal = 100
    46. queryLimit = 100
    47. }
    48. }

    sql初始化,先创建数据库seata,再执行建表sql

    1. -- the table to store GlobalSession data
    2. CREATE TABLE IF NOT EXISTS `global_table`
    3. (
    4. `xid` VARCHAR(128) NOT NULL,
    5. `transaction_id` BIGINT,
    6. `status` TINYINT NOT NULL,
    7. `application_id` VARCHAR(32),
    8. `transaction_service_group` VARCHAR(32),
    9. `transaction_name` VARCHAR(128),
    10. `timeout` INT,
    11. `begin_time` BIGINT,
    12. `application_data` VARCHAR(2000),
    13. `gmt_create` DATETIME,
    14. `gmt_modified` DATETIME,
    15. PRIMARY KEY (`xid`),
    16. KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    17. KEY `idx_transaction_id` (`transaction_id`)
    18. ) ENGINE = InnoDB
    19. DEFAULT CHARSET = utf8;
    20. -- the table to store BranchSession data
    21. CREATE TABLE IF NOT EXISTS `branch_table`
    22. (
    23. `branch_id` BIGINT NOT NULL,
    24. `xid` VARCHAR(128) NOT NULL,
    25. `transaction_id` BIGINT,
    26. `resource_group_id` VARCHAR(32),
    27. `resource_id` VARCHAR(256),
    28. `branch_type` VARCHAR(8),
    29. `status` TINYINT,
    30. `client_id` VARCHAR(64),
    31. `application_data` VARCHAR(2000),
    32. `gmt_create` DATETIME(6),
    33. `gmt_modified` DATETIME(6),
    34. PRIMARY KEY (`branch_id`),
    35. KEY `idx_xid` (`xid`)
    36. ) ENGINE = InnoDB
    37. DEFAULT CHARSET = utf8;
    38. -- the table to store lock data
    39. CREATE TABLE IF NOT EXISTS `lock_table`
    40. (
    41. `row_key` VARCHAR(128) NOT NULL,
    42. `xid` VARCHAR(96),
    43. `transaction_id` BIGINT,
    44. `branch_id` BIGINT NOT NULL,
    45. `resource_id` VARCHAR(256),
    46. `table_name` VARCHAR(32),
    47. `pk` VARCHAR(36),
    48. `gmt_create` DATETIME,
    49. `gmt_modified` DATETIME,
    50. PRIMARY KEY (`row_key`),
    51. KEY `idx_branch_id` (`branch_id`)
    52. ) ENGINE = InnoDB
    53. DEFAULT CHARSET = utf8;

    更改config.txt(https://github.com/seata/seata/tree/v1.4.0),并将其上传至nacos配置中心,其中需要修改的是store.mode、store.db.url、store.db.user、store.db.password,注意service.default.grouplist需要改成当前seata-server所在服务器的ip地址,service.vgroupMapping.my_test_tx_group中的my_test_tx_group为事务分组的名称要与微服务中配置tx-service-group的值一致,后面会说到这个配置;
    上传配置至nacos(在seata-1.4.0/script/config-center/nacos目录下):./nacos-config.sh -h 10.0.0.251
    启动seata-server并指定本机IP:./seata-server.sh -h 10.0.0.251

    1. # seata-1.4.0/script/config-center/config.txt
    2. transport.type=TCP
    3. transport.server=NIO
    4. transport.heartbeat=true
    5. transport.enableClientBatchSendRequest=false
    6. transport.threadFactory.bossThreadPrefix=NettyBoss
    7. transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
    8. transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
    9. transport.threadFactory.shareBossWorker=false
    10. transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
    11. transport.threadFactory.clientSelectorThreadSize=1
    12. transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
    13. transport.threadFactory.bossThreadSize=1
    14. transport.threadFactory.workerThreadSize=default
    15. transport.shutdown.wait=3
    16. service.vgroupMapping.my_test_tx_group=default
    17. service.default.grouplist=127.0.0.1:8091
    18. service.enableDegrade=false
    19. service.disableGlobalTransaction=false
    20. client.rm.asyncCommitBufferLimit=10000
    21. client.rm.lock.retryInterval=10
    22. client.rm.lock.retryTimes=30
    23. client.rm.lock.retryPolicyBranchRollbackOnConflict=true
    24. client.rm.reportRetryCount=5
    25. client.rm.tableMetaCheckEnable=false
    26. client.rm.sqlParserType=druid
    27. client.rm.reportSuccessEnable=false
    28. client.rm.sagaBranchRegisterEnable=false
    29. client.tm.commitRetryCount=5
    30. client.tm.rollbackRetryCount=5
    31. client.tm.defaultGlobalTransactionTimeout=60000
    32. client.tm.degradeCheck=false
    33. client.tm.degradeCheckAllowTimes=10
    34. client.tm.degradeCheckPeriod=2000
    35. store.mode=db
    36. store.file.dir=file_store/data
    37. store.file.maxBranchSessionSize=16384
    38. store.file.maxGlobalSessionSize=512
    39. store.file.fileWriteBufferCacheSize=16384
    40. store.file.flushDiskMode=async
    41. store.file.sessionReloadReadSize=100
    42. store.db.datasource=druid
    43. store.db.dbType=mysql
    44. store.db.driverClassName=com.mysql.jdbc.Driver
    45. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
    46. store.db.user=root
    47. store.db.password=123456
    48. store.db.minConn=5
    49. store.db.maxConn=30
    50. store.db.globalTable=global_table
    51. store.db.branchTable=branch_table
    52. store.db.queryLimit=100
    53. store.db.lockTable=lock_table
    54. store.db.maxWait=5000
    55. store.redis.host=127.0.0.1
    56. store.redis.port=6379
    57. store.redis.maxConn=10
    58. store.redis.minConn=1
    59. store.redis.database=0
    60. store.redis.password=null
    61. store.redis.queryLimit=100
    62. server.recovery.committingRetryPeriod=1000
    63. server.recovery.asynCommittingRetryPeriod=1000
    64. server.recovery.rollbackingRetryPeriod=1000
    65. server.recovery.timeoutRetryPeriod=1000
    66. server.maxCommitRetryTimeout=-1
    67. server.maxRollbackRetryTimeout=-1
    68. server.rollbackRetryTimeoutUnlockEnable=false
    69. client.undo.dataValidation=true
    70. client.undo.logSerialization=jackson
    71. client.undo.onlyCareUpdateColumns=true
    72. server.undo.logSaveDays=7
    73. server.undo.logDeletePeriod=86400000
    74. client.undo.logTable=undo_log
    75. client.log.exceptionRate=100
    76. transport.serialization=seata
    77. transport.compressor=none
    78. metrics.enabled=false
    79. metrics.registryType=compact
    80. metrics.exporterList=prometheus
    81. metrics.exporterPrometheusPort=9898
  3. undo_log建表初始化
    分别在account、storage、order对应的DB中执行以下sql(由于我这里business服务是聚合服务,没有连接DB,所以不需要执行此sql):
    1. CREATE TABLE `undo_log` (
    2. `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
    3. `xid` varchar(100) NOT NULL COMMENT 'global transaction id',
    4. `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    5. `rollback_info` longblob NOT NULL COMMENT 'rollback info',
    6. `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
    7. `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
    8. `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
    9. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';

     

  4. 搭建部署TM(business)
    配置文件(application.yml,这里只有seata相关配置,其他配置大家根据自己的情况配置):
    1. #分布式事物seata
    2. seata:
    3. enabled: true
    4. application-id: ${spring.application.name}
    5. tx-service-group: my_test_tx_group
    6. enable-auto-data-source-proxy: true
    7. config:
    8. type: nacos
    9. nacos:
    10. server-addr: 10.0.0.251:8848
    11. group: SEATA_GROUP
    12. registry:
    13. type: nacos
    14. nacos:
    15. application: seata-server
    16. server-addr: 10.0.0.251:8848
    maven依赖,RM的依赖和TM相同(为了匹配seata-server的版本,我这里自己定义了seata-spring-boot-starter的版本也为1.4.0):
    1. <dependency>
    2. <groupId>com.alibaba.cloud</groupId>
    3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    4. <exclusions>
    5. <exclusion>
    6. <groupId>io.seata</groupId>
    7. <artifactId>seata-spring-boot-starter</artifactId>
    8. </exclusion>
    9. </exclusions>
    10. </dependency>
    11. <dependency>
    12. <groupId>io.seata</groupId>
    13. <artifactId>seata-spring-boot-starter</artifactId>
    14. <version>1.4.0</version>
    15. </dependency>

    代码入口:

    1. @GlobalTransactional(timeoutMills = 300000, name = "dubbo-seata-example")
    2. @Override
    3. public void saveOrder(OrderReqVo req) {
    4. log.info("开始全局事务,XID = " + RootContext.getXID());
    5. //扣余额
    6. boolean flag = rpcUserBalanceService.decreaseUserBalance(xxxx);
    7. if (!flag) {
    8. throw new BusinessException("XXXX");
    9. }
    10. //减库存
    11. flag = rpcStorageService.decreaseProductStock(xxxx);
    12. if (!flag) {
    13. throw new BusinessException("XXXX");
    14. }
    15. //存订单
    16. flag = rpcOrderService.saveOrder(xxxxx);
    17. if (!flag) {
    18. throw new BusinessException("XXXX");
    19. }
    20. }

     

  5. 搭建部署RM(account、storage、order)
    配置文件和maven依赖和TM一样,三个微服务模块分别创建对应方法,然后启动服务:
    1. ########每个微服务需要开启本地事务
    2. # account
    3. public boolean decreaseUserBalance(xxxxx) {
    4. return 扣款结果;
    5. }
    6. # storage
    7. public boolean decreaseProductStock(xxxxx) {
    8. return 减库存结果;
    9. }
    10. # order
    11. public boolean saveOrder(xxxxx) {
    12. return 保存订单结果;
    13. }

     

  6. 实战模拟
    模拟正常请求,查看对应数据库的数据是否正常;
    模拟出现异常时(比如扣款成功,减库存成功,保存订单失败),查看对应数据库的数据是否正常;
    debug查看分布式事务中间过程,undo_log的数据是什么样子的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/286142
推荐阅读
相关标签
  

闽ICP备14008679号