赞
踩
前言
随着微服务架构的流行,系统之间的交互变得更加复杂,事务一致性的保障成为一个挑战。Seata提供了一套完整的分布式事务解决方案,能够确保分布式系统中各个参与方的事务一致性。
seata官方文档https://seata.io/zh-cn/docs/user/quickstart/
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
XA模式: XA模式是一个标准化的分布式事务处理协议,它使用两阶段提交(Two-Phase Commit,2PC)协议来确保多个资源管理器(RM)参与的分布式事务的一致性。在XA模式中,事务管理器负责协调各个参与者的资源管理器,并决定事务的提交或回滚。
AT模式: AT(Automatic Transactional)模式是Seata框架中的一种分布式事务解决方案。AT模式通过在每个参与者(服务)的数据库操作中添加适当的事务注解,使得Seata能够自动协调并管理分布式事务的提交或回滚,以保证事务的一致性。
TCC模式: TCC(Try-Confirm-Cancel)模式是一种基于补偿机制的分布式事务解决方案。在TCC模式中,事务操作被分解为三个阶段:尝试(Try)、确认(Confirm)和取消(Cancel)。参与者通过实现这三个阶段的业务逻辑,并提供相应的补偿操作,来保证分布式事务的一致性。
SAGA模式: SAGA模式是一种用于实现长事务处理的分布式事务解决方案。在SAGA模式中,事务被分解为多个连续的局部事务,每个局部事务都有相应的补偿操作。通过按照预定义的顺序执行这些局部事务和补偿操作,SAGA模式实现了分布式事务的一致性和容错性。
XA | AT | TCC | SAGA | |
一致性 | 强一致 | 弱一致 | 弱一致 | 最终一致 |
隔离性 | 完全隔离 | 基于全局锁隔离 | 基于资源预留隔离 | 无隔离 |
代码侵入 | 无 | 无 | 有,需要编写3个接口 | 有,需要编写状态机和补偿业务 |
性能 | 差 | 好 | 非常好 | 非常好 |
场景 | 对一致性、隔离性有高要求的业务 | 基于关系型数据库的大多数分布式事务场景都可以 | 对性能要求较高的事务;有非关系型数据库要参与的事务 | 业务流程长、业务流程多;参与者包含其它公司或遗留系统服务,无法提供TCC模式要求的三个接口 |
seata下载https://github.com/apache/incubator-seata/releases选择自己需要的版本下载。
配置seata的config和registry类型为nacos,并把nacos相关的配置信息完善;
- server:
- port: 7091
-
- spring:
- application:
- name: seata-server
-
- logging:
- config: classpath:logback-spring.xml
- file:
- path: ${user.home}/logs/seata
- extend:
- logstash-appender:
- destination: 127.0.0.1:4560
- kafka-appender:
- bootstrap-servers: 127.0.0.1:9092
- topic: logback_to_logstash
-
- console:
- user:
- username: seata
- password: seata
-
-
- seata:
- config:
- # support: nacos, consul, apollo, zk, etcd3
- type: nacos
- nacos:
- server-addr: 192.168.0.74:8847
- namespace: rfid
- group: SEATA_GROUP
- username: nacos
- password: nacos
- data-id: seataServer.properties
-
- registry:
- # support: nacos, eureka, redis, zk, consul, etcd3, sofa
- type: nacos
- nacos:
- application: seata-server
- server-addr: 192.168.0.74:8847
- group: SEATA_GROUP
- namespace: rfid
- cluster: default
- username: nacos
- password: nacos
-
- # store:
- # support: file 、 db 、 redis
- # mode: file
- # server:
- # service-port: 8091 #If not configured, the default is '${server.port} + 1000'
- security:
- secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
- tokenValidityInMilliseconds: 1800000
- ignore:
- urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
具体配置代码给出如下:
- #公共部分
- transport.serialization=seata
- transport.compressor=none
- transport.heartbeat=true
- registry.type=nacos
- config.type=nacos
- #server端
- server.undo.logSaveDays=7
- server.undo.logDeletePeriod=86400000
- server.maxCommitRetryTimeout=-1
- server.maxRollbackRetryTimeout=-1
- server.recovery.committingRetryPeriod=1000
- server.recovery.asynCommittingRetryPeriod=1000
- server.recovery.rollbackingRetryPeriod=1000
- server.recovery.timeoutRetryPeriod=1000
- #存储模式选db
- store.mode=db
- store.db.datasource=druid
- store.db.dbType=mysql
- store.db.driverClassName=com.mysql.cj.jdbc.Driver
- #这里url跟上rewriteBatchedStatements=true,原因看官网-参数配置-附录7,增加批量插入效率
- store.db.url=jdbc:mysql://192.168.10.23:19131/seata?useUnicode=true$rewriteBatchedStatements=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true
- store.db.user=root
- store.db.password=Digitalor@MCRFID1688
-
- #默认1和20,稍微调大点
- store.db.minConn=5
- store.db.maxConn=30
- store.db.maxWait=5000
- store.db.globalTable=global_table
- store.db.branchTable=branch_table
- store.db.lockTable=lock_table
- store.db.queryLimit=100
- #监控,只支持prometheus
- metrics.enabled=false
- metrics.registryType=compact
- metrics.exporterList=prometheus
- metrics.exporterPrometheusPort=9898
- #client端
- seata.enabled=true
- seata.enableAutoDataSourceProxy=true
- seata.useJdkProxy=false
- transport.enableClientBatchSendRequest=true
- client.log.exceptionRate=100
- # 全局事务Mysql数据的自动清理,默认为false,防止数据过大
- service.disableGlobalTransaction=false
- client.tm.degradeCheck=false
- client.tm.degradeCheckAllowTimes=10
- client.tm.degradeCheckPeriod=2000
- client.rm.reportSuccessEnable=false
- client.rm.asyncCommitBufferLimit=10000
- client.rm.lock.retryInterval=10
- client.rm.lock.retryTimes=30
- client.rm.lock.retryPolicyBranchRollbackOnConflict=true
- client.rm.reportRetryCount=5
- client.rm.tableMetaCheckEnable=false
- #一阶段全局提交和回滚结果上报TC重试次数,默认1,这里改成3
- client.tm.commitRetryCount=3
- client.tm.rollbackRetryCount=3
- client.undo.dataValidation=true
- client.undo.logSerialization=jackson
- client.undo.logTable=undo_log
- client.undo.onlyCareUpdateColumns=true
- client.rm.sqlParserType=druid
- #自定义事务组my_tx_group
- service.vgroupMapping.my_tx_group=default
global_table:全局事务信息
branch_table :事务参与者信息(分支事务)
lock_table:全局锁
distributed_lock:分布式锁
- -- -------------------------------- The script used when storeMode is 'db' --------------------------------
- -- the table to store GlobalSession data
- CREATE TABLE IF NOT EXISTS `global_table`
- (
- `xid` VARCHAR(128) NOT NULL,
- `transaction_id` BIGINT,
- `status` TINYINT NOT NULL,
- `application_id` VARCHAR(32),
- `transaction_service_group` VARCHAR(32),
- `transaction_name` VARCHAR(128),
- `timeout` INT,
- `begin_time` BIGINT,
- `application_data` VARCHAR(2000),
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY (`xid`),
- KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
- KEY `idx_transaction_id` (`transaction_id`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4;
-
- -- the table to store BranchSession data
- CREATE TABLE IF NOT EXISTS `branch_table`
- (
- `branch_id` BIGINT NOT NULL,
- `xid` VARCHAR(128) NOT NULL,
- `transaction_id` BIGINT,
- `resource_group_id` VARCHAR(32),
- `resource_id` VARCHAR(256),
- `branch_type` VARCHAR(8),
- `status` TINYINT,
- `client_id` VARCHAR(64),
- `application_data` VARCHAR(2000),
- `gmt_create` DATETIME(6),
- `gmt_modified` DATETIME(6),
- PRIMARY KEY (`branch_id`),
- KEY `idx_xid` (`xid`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4;
-
- -- the table to store lock data
- CREATE TABLE IF NOT EXISTS `lock_table`
- (
- `row_key` VARCHAR(128) NOT NULL,
- `xid` VARCHAR(128),
- `transaction_id` BIGINT,
- `branch_id` BIGINT NOT NULL,
- `resource_id` VARCHAR(256),
- `table_name` VARCHAR(32),
- `pk` VARCHAR(36),
- `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
- `gmt_create` DATETIME,
- `gmt_modified` DATETIME,
- PRIMARY KEY (`row_key`),
- KEY `idx_status` (`status`),
- KEY `idx_branch_id` (`branch_id`),
- KEY `idx_xid` (`xid`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4;
-
- CREATE TABLE IF NOT EXISTS `distributed_lock`
- (
- `lock_key` CHAR(20) NOT NULL,
- `lock_value` VARCHAR(20) NOT NULL,
- `expire` BIGINT,
- primary key (`lock_key`)
- ) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4;
-
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
- INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
- CREATE TABLE `undo_log` (
- `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
- `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
- `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
- `rollback_info` longblob NOT NULL COMMENT 'rollback info',
- `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
- `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
- `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
- UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
在你的事务进行过程中,可以在client查到相关的事务信息,信息在事务结束后会被删除,是为了防止数据库无用数据过大,你也可以在配置里定义不删除。
我本地alibaba-cloud版本为 2.2.6.RELEASE,其对应的seata版本为1.3.0,但是想使用seata1.5.2的版本,故需要替换seata核心包
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
- <exclusions>
- <exclusion>
- <groupId>io.seata</groupId>
- <artifactId>seata-spring-boot-starter</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>io.seata</groupId>
- <artifactId>seata-spring-boot-starter</artifactId>
- <version>1.5.2</version>
- </dependency>
- # seata配置
- seata.enabled=true
- # 启用自动数据源代理功能
- seata.enable-auto-data-source-proxy=true
- # Seata 应用编号,默认为 ${spring.application.name}
- seata.application-id=${spring.application.name}
- # Seata 事务组编号,用于 TC 集群名
- seata.tx-service-group=my_tx_group
- seata.config.type=nacos
- seata.config.nacos.server-addr=192.168.0.74:8847
- seata.config.nacos.namespace=rfid
- seata.config.nacos.group=SEATA_GROUP
- seata.config.nacos.username=nacos
- seata.config.nacos.password=nacos
- seata.config.nacos.data-id=seataServer.properties
- seata.registry.type=nacos
- seata.registry.nacos.application=seata-server
- seata.registry.nacos.server-addr=192.168.0.74:8847
- seata.registry.nacos.namespace=rfid
- seata.registry.nacos.group=SEATA_GROUP
- seata.registry.nacos.cluster=default
- seata.registry.nacos.username=nacos
- seata.registry.nacos.password=nacos
- seata.service.vgroup-mapping.my_tx_group=default
在代码发生异常即会触发回滚
- @GlobalTransactional(timeoutMills = 10 * 60 * 1000, rollbackFor = Exception.class)
- public Result test(){
-
- }
代码示例:
- @GlobalTransactional(timeoutMills = 10 * 60 * 1000, rollbackFor = Exception.class)
- public Result batchPut(List<Demo> list) throws InterruptedException {
- CopyOnWriteArrayList<String> modelIdMessageList = new CopyOnWriteArrayList();
- CopyOnWriteArrayList<String> exceptionList = new CopyOnWriteArrayList();
-
- // 获取全局事务ID
- String xid = RootContext.getXID();
-
- CountDownLatch countDownLatch = new CountDownLatch(list.size());
- list.forEach(re->
- threadPoolTaskExecutor.execute(() -> {
-
- this.putA(
- countDownLatch,
- xid
- );
-
- })
- );
-
- // 等待子线程全部完成
- countDownLatch.await();
-
- // 消息队列补偿、redis补偿
- if (exceptionList.size() > 0) {
- ...
- throw new RuntimeException("user-defined GlobalTransactional rollback");
- }
- return Result.success();
- }
-
-
-
- public void putA(
- CountDownLatch countDownLatch,
- String xid
- ) {
- try {
- // 绑定全局事务ID
- RootContext.bind(xid);
- ...
- } catch (Exception e) {
-
- } finally {
- // 释放全局事务ID
- RootContext.unbind();
- // 线程信号量减一
- countDownLatch.countDown();
- }
- }
linux服务器搭建seata和本地没有太大的区别,只是需要把seata运行到服务器上就行,不管是以jar包的方式运行,还是以docker容器的方式运行。
1)、linux服务器下载seata-server-1.5.2.zip或者本地下载上传到服务器,解压zip(命令:unzip seata-server-1.5.2.zip);
2)、其他配置和本地配置一样,都改为服务器相关信息的配置即可;
3)、启动seata,运行seate\seata\bin\seata-server.sh;也可使用自定义脚本start_seata.sh启动,可直接使用也可参考优化;
- cd /home/jenkins/apps/seata/bin
- sh seata-server.sh -h 192.168.100.100 -p 8091
4)注意:服务器可能没定义JAVA_HOME,启动过程中可能会报错,使用命令定义
export JAVA_HOME=/path/to/java
操作基本类似,只是需要下载seata对应版本镜像在容器中运行,配置相关文件;
对自己在学习和应用seata AT模式的过程进行了一个梳理,也给自己加强一下印象,后续有机会的话会应用TCC模式并进行文章输出。SAGA模式也曾使用过,有机会也会整理一个使用思路出来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。