当前位置:   article > 正文

分布式事务解决方案-Seata应用实践_seata解决事务一致性

seata解决事务一致性

前言

随着微服务架构的流行,系统之间的交互变得更加复杂,事务一致性的保障成为一个挑战。Seata提供了一套完整的分布式事务解决方案,能够确保分布式系统中各个参与方的事务一致性。


一、Seata是什么?

seata官方文档icon-default.png?t=N7T8https://seata.io/zh-cn/docs/user/quickstart/

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

四大事务模式:

XA模式: XA模式是一个标准化的分布式事务处理协议,它使用两阶段提交(Two-Phase Commit,2PC)协议来确保多个资源管理器(RM)参与的分布式事务的一致性。在XA模式中,事务管理器负责协调各个参与者的资源管理器,并决定事务的提交或回滚。

AT模式: AT(Automatic Transactional)模式是Seata框架中的一种分布式事务解决方案。AT模式通过在每个参与者(服务)的数据库操作中添加适当的事务注解,使得Seata能够自动协调并管理分布式事务的提交或回滚,以保证事务的一致性。

TCC模式: TCC(Try-Confirm-Cancel)模式是一种基于补偿机制的分布式事务解决方案。在TCC模式中,事务操作被分解为三个阶段:尝试(Try)、确认(Confirm)和取消(Cancel)。参与者通过实现这三个阶段的业务逻辑,并提供相应的补偿操作,来保证分布式事务的一致性。

SAGA模式: SAGA模式是一种用于实现长事务处理的分布式事务解决方案。在SAGA模式中,事务被分解为多个连续的局部事务,每个局部事务都有相应的补偿操作。通过按照预定义的顺序执行这些局部事务和补偿操作,SAGA模式实现了分布式事务的一致性和容错性。

XAATTCCSAGA
一致性强一致弱一致弱一致最终一致
隔离性完全隔离基于全局锁隔离基于资源预留隔离无隔离
代码侵入有,需要编写3个接口有,需要编写状态机和补偿业务
性能非常好非常好
场景对一致性、隔离性有高要求的业务基于关系型数据库的大多数分布式事务场景都可以对性能要求较高的事务;有非关系型数据库要参与的事务业务流程长、业务流程多;参与者包含其它公司或遗留系统服务,无法提供TCC模式要求的三个接口

二、windows搭建

1.下载并配置

1.1、下载

seata下载icon-default.png?t=N7T8https://github.com/apache/incubator-seata/releases选择自己需要的版本下载。

1.2、解压

1.3、修改配置文件seate\seata\conf\application.yml,修改seata的config对应的nacos信息和seata的registry对应的nacos信息

配置seata的config和registry类型为nacos,并把nacos相关的配置信息完善;

  1. server:
  2. port: 7091
  3. spring:
  4. application:
  5. name: seata-server
  6. logging:
  7. config: classpath:logback-spring.xml
  8. file:
  9. path: ${user.home}/logs/seata
  10. extend:
  11. logstash-appender:
  12. destination: 127.0.0.1:4560
  13. kafka-appender:
  14. bootstrap-servers: 127.0.0.1:9092
  15. topic: logback_to_logstash
  16. console:
  17. user:
  18. username: seata
  19. password: seata
  20. seata:
  21. config:
  22. # support: nacos, consul, apollo, zk, etcd3
  23. type: nacos
  24. nacos:
  25. server-addr: 192.168.0.74:8847
  26. namespace: rfid
  27. group: SEATA_GROUP
  28. username: nacos
  29. password: nacos
  30. data-id: seataServer.properties
  31. registry:
  32. # support: nacos, eureka, redis, zk, consul, etcd3, sofa
  33. type: nacos
  34. nacos:
  35. application: seata-server
  36. server-addr: 192.168.0.74:8847
  37. group: SEATA_GROUP
  38. namespace: rfid
  39. cluster: default
  40. username: nacos
  41. password: nacos
  42. # store:
  43. # support: file 、 db 、 redis
  44. # mode: file
  45. # server:
  46. # service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  47. security:
  48. secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
  49. tokenValidityInMilliseconds: 1800000
  50. ignore:
  51. urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

1.4、nacos添加配置文件seataServer.properties,配置seata服务的相关信息,例如数据库连接等

具体配置代码给出如下:

  1. #公共部分
  2. transport.serialization=seata
  3. transport.compressor=none
  4. transport.heartbeat=true
  5. registry.type=nacos
  6. config.type=nacos
  7. #server端
  8. server.undo.logSaveDays=7
  9. server.undo.logDeletePeriod=86400000
  10. server.maxCommitRetryTimeout=-1
  11. server.maxRollbackRetryTimeout=-1
  12. server.recovery.committingRetryPeriod=1000
  13. server.recovery.asynCommittingRetryPeriod=1000
  14. server.recovery.rollbackingRetryPeriod=1000
  15. server.recovery.timeoutRetryPeriod=1000
  16. #存储模式选db
  17. store.mode=db
  18. store.db.datasource=druid
  19. store.db.dbType=mysql
  20. store.db.driverClassName=com.mysql.cj.jdbc.Driver
  21. #这里url跟上rewriteBatchedStatements=true,原因看官网-参数配置-附录7,增加批量插入效率
  22. store.db.url=jdbc:mysql://192.168.10.23:19131/seata?useUnicode=true$rewriteBatchedStatements=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
  23. store.db.user=root
  24. store.db.password=Digitalor@MCRFID1688
  25. #默认120,稍微调大点
  26. store.db.minConn=5
  27. store.db.maxConn=30
  28. store.db.maxWait=5000
  29. store.db.globalTable=global_table
  30. store.db.branchTable=branch_table
  31. store.db.lockTable=lock_table
  32. store.db.queryLimit=100
  33. #监控,只支持prometheus
  34. metrics.enabled=false
  35. metrics.registryType=compact
  36. metrics.exporterList=prometheus
  37. metrics.exporterPrometheusPort=9898
  38. #client端
  39. seata.enabled=true
  40. seata.enableAutoDataSourceProxy=true
  41. seata.useJdkProxy=false
  42. transport.enableClientBatchSendRequest=true
  43. client.log.exceptionRate=100
  44. # 全局事务Mysql数据的自动清理,默认为false,防止数据过大
  45. service.disableGlobalTransaction=false
  46. client.tm.degradeCheck=false
  47. client.tm.degradeCheckAllowTimes=10
  48. client.tm.degradeCheckPeriod=2000
  49. client.rm.reportSuccessEnable=false
  50. client.rm.asyncCommitBufferLimit=10000
  51. client.rm.lock.retryInterval=10
  52. client.rm.lock.retryTimes=30
  53. client.rm.lock.retryPolicyBranchRollbackOnConflict=true
  54. client.rm.reportRetryCount=5
  55. client.rm.tableMetaCheckEnable=false
  56. #一阶段全局提交和回滚结果上报TC重试次数,默认1,这里改成3
  57. client.tm.commitRetryCount=3
  58. client.tm.rollbackRetryCount=3
  59. client.undo.dataValidation=true
  60. client.undo.logSerialization=jackson
  61. client.undo.logTable=undo_log
  62. client.undo.onlyCareUpdateColumns=true
  63. client.rm.sqlParserType=druid
  64. #自定义事务组my_tx_group
  65. service.vgroupMapping.my_tx_group=default

1.5、mysql建表

1.5.1、seata数据库四张表

global_table:全局事务信息

branch_table :事务参与者信息(分支事务)

lock_table:全局锁

distributed_lock:分布式锁

建表语句地址

  1. -- -------------------------------- The script used when storeMode is 'db' --------------------------------
  2. -- the table to store GlobalSession data
  3. CREATE TABLE IF NOT EXISTS `global_table`
  4. (
  5. `xid` VARCHAR(128) NOT NULL,
  6. `transaction_id` BIGINT,
  7. `status` TINYINT NOT NULL,
  8. `application_id` VARCHAR(32),
  9. `transaction_service_group` VARCHAR(32),
  10. `transaction_name` VARCHAR(128),
  11. `timeout` INT,
  12. `begin_time` BIGINT,
  13. `application_data` VARCHAR(2000),
  14. `gmt_create` DATETIME,
  15. `gmt_modified` DATETIME,
  16. PRIMARY KEY (`xid`),
  17. KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
  18. KEY `idx_transaction_id` (`transaction_id`)
  19. ) ENGINE = InnoDB
  20. DEFAULT CHARSET = utf8mb4;
  21. -- the table to store BranchSession data
  22. CREATE TABLE IF NOT EXISTS `branch_table`
  23. (
  24. `branch_id` BIGINT NOT NULL,
  25. `xid` VARCHAR(128) NOT NULL,
  26. `transaction_id` BIGINT,
  27. `resource_group_id` VARCHAR(32),
  28. `resource_id` VARCHAR(256),
  29. `branch_type` VARCHAR(8),
  30. `status` TINYINT,
  31. `client_id` VARCHAR(64),
  32. `application_data` VARCHAR(2000),
  33. `gmt_create` DATETIME(6),
  34. `gmt_modified` DATETIME(6),
  35. PRIMARY KEY (`branch_id`),
  36. KEY `idx_xid` (`xid`)
  37. ) ENGINE = InnoDB
  38. DEFAULT CHARSET = utf8mb4;
  39. -- the table to store lock data
  40. CREATE TABLE IF NOT EXISTS `lock_table`
  41. (
  42. `row_key` VARCHAR(128) NOT NULL,
  43. `xid` VARCHAR(128),
  44. `transaction_id` BIGINT,
  45. `branch_id` BIGINT NOT NULL,
  46. `resource_id` VARCHAR(256),
  47. `table_name` VARCHAR(32),
  48. `pk` VARCHAR(36),
  49. `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
  50. `gmt_create` DATETIME,
  51. `gmt_modified` DATETIME,
  52. PRIMARY KEY (`row_key`),
  53. KEY `idx_status` (`status`),
  54. KEY `idx_branch_id` (`branch_id`),
  55. KEY `idx_xid` (`xid`)
  56. ) ENGINE = InnoDB
  57. DEFAULT CHARSET = utf8mb4;
  58. CREATE TABLE IF NOT EXISTS `distributed_lock`
  59. (
  60. `lock_key` CHAR(20) NOT NULL,
  61. `lock_value` VARCHAR(20) NOT NULL,
  62. `expire` BIGINT,
  63. primary key (`lock_key`)
  64. ) ENGINE = InnoDB
  65. DEFAULT CHARSET = utf8mb4;
  66. INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
  67. INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
  68. INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
  69. INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
1.5.2、各分支数据库undo_log表
  1. CREATE TABLE `undo_log` (
  2. `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  3. `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  4. `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  5. `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  6. `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  7. `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  8. `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  9. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';

1.6、seata启动,运行seate\seata\bin\seata-server.bat

1.7、客户端展示,登录seata client对事务信息进行查看,地址http://localhost:7091,账号密都码默认为:seata

在你的事务进行过程中,可以在client查到相关的事务信息,信息在事务结束后会被删除,是为了防止数据库无用数据过大,你也可以在配置里定义不删除。

2.Seata实践(AT模式)

2.1、项目maven包下载

我本地alibaba-cloud版本为 2.2.6.RELEASE,其对应的seata版本为1.3.0,但是想使用seata1.5.2的版本,故需要替换seata核心包

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <groupId>io.seata</groupId>
  7. <artifactId>seata-spring-boot-starter</artifactId>
  8. </exclusion>
  9. </exclusions>
  10. </dependency>
  11. <dependency>
  12. <groupId>io.seata</groupId>
  13. <artifactId>seata-spring-boot-starter</artifactId>
  14. <version>1.5.2</version>
  15. </dependency>

2.2、本地yml(properties)配置

  1. # seata配置
  2. seata.enabled=true
  3. # 启用自动数据源代理功能
  4. seata.enable-auto-data-source-proxy=true
  5. # Seata 应用编号,默认为 ${spring.application.name}
  6. seata.application-id=${spring.application.name}
  7. # Seata 事务组编号,用于 TC 集群名
  8. seata.tx-service-group=my_tx_group
  9. seata.config.type=nacos
  10. seata.config.nacos.server-addr=192.168.0.74:8847
  11. seata.config.nacos.namespace=rfid
  12. seata.config.nacos.group=SEATA_GROUP
  13. seata.config.nacos.username=nacos
  14. seata.config.nacos.password=nacos
  15. seata.config.nacos.data-id=seataServer.properties
  16. seata.registry.type=nacos
  17. seata.registry.nacos.application=seata-server
  18. seata.registry.nacos.server-addr=192.168.0.74:8847
  19. seata.registry.nacos.namespace=rfid
  20. seata.registry.nacos.group=SEATA_GROUP
  21. seata.registry.nacos.cluster=default
  22. seata.registry.nacos.username=nacos
  23. seata.registry.nacos.password=nacos
  24. seata.service.vgroup-mapping.my_tx_group=default

2.3、使用@GlobalTransactional处理分布式事务的方法,在项目中使用AT模式对现有代码进行无侵入式分布式事务改造

在代码发生异常即会触发回滚

  1. @GlobalTransactional(timeoutMills = 10 * 60 * 1000, rollbackFor = Exception.class)
  2. public Result test(){
  3. }

2.4、@GlobalTransactional使用注意点

2.4.1、注意@GlobalTransaction和@Transaction的联动使用中事务不生效的问题,最好被调用方的接口不使用Transaction。
2.4.2、@GlobalTransactional在使用多线程时的xid失效问题,每个子线程都不受主线程的xid管控,需要将xid传入子线程,并给子线程重新绑定xid。

代码示例:

  1. @GlobalTransactional(timeoutMills = 10 * 60 * 1000, rollbackFor = Exception.class)
  2. public Result batchPut(List<Demo> list) throws InterruptedException {
  3. CopyOnWriteArrayList<String> modelIdMessageList = new CopyOnWriteArrayList();
  4. CopyOnWriteArrayList<String> exceptionList = new CopyOnWriteArrayList();
  5. // 获取全局事务ID
  6. String xid = RootContext.getXID();
  7. CountDownLatch countDownLatch = new CountDownLatch(list.size());
  8. list.forEach(re->
  9. threadPoolTaskExecutor.execute(() -> {
  10. this.putA(
  11. countDownLatch,
  12. xid
  13. );
  14. })
  15. );
  16. // 等待子线程全部完成
  17. countDownLatch.await();
  18. // 消息队列补偿、redis补偿
  19. if (exceptionList.size() > 0) {
  20. ...
  21. throw new RuntimeException("user-defined GlobalTransactional rollback");
  22. }
  23. return Result.success();
  24. }
  25. public void putA(
  26. CountDownLatch countDownLatch,
  27. String xid
  28. ) {
  29. try {
  30. // 绑定全局事务ID
  31. RootContext.bind(xid);
  32. ...
  33. } catch (Exception e) {
  34. } finally {
  35. // 释放全局事务ID
  36. RootContext.unbind();
  37. // 线程信号量减一
  38. countDownLatch.countDown();
  39. }
  40. }


三、Linux搭建

linux服务器搭建seata和本地没有太大的区别,只是需要把seata运行到服务器上就行,不管是以jar包的方式运行,还是以docker容器的方式运行。

1、以jar包的方式在服务器运行

1)、linux服务器下载seata-server-1.5.2.zip或者本地下载上传到服务器,解压zip(命令:unzip seata-server-1.5.2.zip);

2)、其他配置和本地配置一样,都改为服务器相关信息的配置即可;

3)、启动seata,运行seate\seata\bin\seata-server.sh;也可使用自定义脚本start_seata.sh启动,可直接使用也可参考优化;

  1. cd /home/jenkins/apps/seata/bin
  2. sh seata-server.sh -h 192.168.100.100 -p 8091

4)注意:服务器可能没定义JAVA_HOME,启动过程中可能会报错,使用命令定义

export JAVA_HOME=/path/to/java

2、以容器的方式在服务器运行

操作基本类似,只是需要下载seata对应版本镜像在容器中运行,配置相关文件;


总结

对自己在学习和应用seata AT模式的过程进行了一个梳理,也给自己加强一下印象,后续有机会的话会应用TCC模式并进行文章输出。SAGA模式也曾使用过,有机会也会整理一个使用思路出来。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号