当前位置:   article > 正文

seata的快速入门和实战_seata入门

seata入门

一、介绍:

Seata是阿里巴巴旗下的产品,是一款开源的分布式事务解决方案,旨在解决分布式事务问题。

我们有必要先了解一下分布式事务:

在微服务体系中,每一个模块都有连接一个数据库,这一点与单体项目是不同的,单体项目就连接一个数据库。

那么如果有多个模块之间相互调用,怎样保证各个模块之间的事务一致性?由此引出了分布式事务。

在同一个数据库中,我们要保证事物的一致性是很简单的。因为MySQL是基于单机事物的,所以一旦遇到跨库的场景,那么MySQL数据库就无能为力了。在这种情景下,seata蕴育而生。

二、seata的下载和启动:

seata的官网:Apache Seata

seata的下载地址:

https://github.com/apache/incubator-seata/releases/download/v2.0.0/seata-server-2.0.0.zip

e4ea6f2337d549fe95bae2709f3d508a.png

有三个概念要先了解:

  1. TC是事务协调器(就是Seata本身),负责管理全局事务的执行过程。它生成全局唯一的事务ID,并协调各个分支事务的提交或回滚,以确保数据的一致性。有且仅有一个

  2. TM是事务管理器,(标注全局@GlobalTransactional启动入口动作的微服务模块)负责发起全局事务的提交或回滚请求,并与TC交互以执行相应操作。它与业务逻辑代码交互,触发分支事务的执行,并根据TC的指示来决定全局事务的最终状。有且仅有一个

  3. .RM是资源管理器,(各个模块的MySQL数据库本身)负责管理分支事务的资源,如数据库、消息队列、缓存等。它接收TC的指令并执行相应的事务操作,以确保分支事务的一致性。可以有多个

总的来说,TC负责全局事务的协调和管理,TM负责全局事务的发起和控制,RM负责具体资源的管理和事务操作。它们共同协作,实现了Seata分布式事务框架的核心功能,确保分布式系统中事务的一致性和可靠性。

下载完seata之后,要使用seata要现在本地的MySQL数据库中新建一个数据库seata,来记录seata在运行过程中的信息;

  1. CREATE DATABASE seata;
  2. USE seata;

sql脚本:
 

  1. -- -------------------------------- The script used when storeMode is 'db' --------------------------------
  2. -- the table to store GlobalSession data
  3. CREATE TABLE IF NOT EXISTS `global_table`
  4. (
  5. `xid` VARCHAR(128) NOT NULL,
  6. `transaction_id` BIGINT,
  7. `status` TINYINT NOT NULL,
  8. `application_id` VARCHAR(32),
  9. `transaction_service_group` VARCHAR(32),
  10. `transaction_name` VARCHAR(128),
  11. `timeout` INT,
  12. `begin_time` BIGINT,
  13. `application_data` VARCHAR(2000),
  14. `gmt_create` DATETIME,
  15. `gmt_modified` DATETIME,
  16. PRIMARY KEY (`xid`),
  17. KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
  18. KEY `idx_transaction_id` (`transaction_id`)
  19. ) ENGINE = InnoDB
  20. DEFAULT CHARSET = utf8mb4;
  21. -- the table to store BranchSession data
  22. CREATE TABLE IF NOT EXISTS `branch_table`
  23. (
  24. `branch_id` BIGINT NOT NULL,
  25. `xid` VARCHAR(128) NOT NULL,
  26. `transaction_id` BIGINT,
  27. `resource_group_id` VARCHAR(32),
  28. `resource_id` VARCHAR(256),
  29. `branch_type` VARCHAR(8),
  30. `status` TINYINT,
  31. `client_id` VARCHAR(64),
  32. `application_data` VARCHAR(2000),
  33. `gmt_create` DATETIME(6),
  34. `gmt_modified` DATETIME(6),
  35. PRIMARY KEY (`branch_id`),
  36. KEY `idx_xid` (`xid`)
  37. ) ENGINE = InnoDB
  38. DEFAULT CHARSET = utf8mb4;
  39. -- the table to store lock data
  40. CREATE TABLE IF NOT EXISTS `lock_table`
  41. (
  42. `row_key` VARCHAR(128) NOT NULL,
  43. `xid` VARCHAR(128),
  44. `transaction_id` BIGINT,
  45. `branch_id` BIGINT NOT NULL,
  46. `resource_id` VARCHAR(256),
  47. `table_name` VARCHAR(32),
  48. `pk` VARCHAR(36),
  49. `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
  50. `gmt_create` DATETIME,
  51. `gmt_modified` DATETIME,
  52. PRIMARY KEY (`row_key`),
  53. KEY `idx_status` (`status`),
  54. KEY `idx_branch_id` (`branch_id`),
  55. KEY `idx_xid` (`xid`)
  56. ) ENGINE = InnoDB
  57. DEFAULT CHARSET = utf8mb4;
  58. CREATE TABLE IF NOT EXISTS `distributed_lock`
  59. (
  60. `lock_key` CHAR(20) NOT NULL,
  61. `lock_value` VARCHAR(20) NOT NULL,
  62. `expire` BIGINT,
  63. primary key (`lock_key`)
  64. ) ENGINE = InnoDB
  65. DEFAULT CHARSET = utf8mb4;

创建好库和表之后,更改seata的内容来启动seata:
在seata的conf文件夹下,找到application.yml;

4ecc071048434e2f9d5133bd4d62ae70.png

更改application.yml文件,启动seata:

  1. # Copyright 1999-2019 Seata.io Group.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. server:
  15. port: 7091
  16. spring:
  17. application:
  18. name: seata-server
  19. logging:
  20. config: classpath:logback-spring.xml
  21. file:
  22. path: ${log.home:${user.home}/logs/seata}
  23. extend:
  24. logstash-appender:
  25. destination: 127.0.0.1:4560
  26. kafka-appender:
  27. bootstrap-servers: 127.0.0.1:9092
  28. topic: logback_to_logstash
  29. console:
  30. user:
  31. username: seata
  32. password: seata
  33. seata:
  34. config:
  35. type: nacos
  36. # support: nacos, consul, apollo, zk, etcd3
  37. nacos:
  38. server-addr: 192.168.231.110:8848
  39. namespace:
  40. group: SEATA_GROUP
  41. username:
  42. password:
  43. context-path:
  44. ##if use MSE Nacos with auth, mutex with username/password attribute
  45. #access-key:
  46. #secret-key:
  47. data-id: seataServer.properties
  48. registry:
  49. # support: nacos, eureka, redis, zk, consul, etcd3, sofa
  50. type: nacos
  51. # preferred-networks: 30.240.*
  52. nacos:
  53. application: seata-server
  54. server-addr: 192.168.231.110:8848
  55. group: SEATA_GROUP
  56. namespace:
  57. cluster: default
  58. username:
  59. password:
  60. context-path:
  61. store:
  62. mode: db
  63. # support: file 、 db 、 redis 、 raft
  64. db:
  65. datasource: druid
  66. db-type: mysql
  67. driver-class-name: com.mysql.cj.jdbc.Driver
  68. url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
  69. user: root
  70. password: 123456
  71. min-conn: 10
  72. max-conn: 100
  73. global-table: global_table
  74. branch-table: branch_table
  75. lock-table: lock_table
  76. distributed-lock-table: distributed_lock
  77. query-limit: 1000
  78. max-wait: 5000
  79. # server:
  80. # service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  81. security:
  82. secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
  83. tokenValidityInMilliseconds: 1800000
  84. ignore:
  85. urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**

根据你nacos、MySQL表的信息,进行相应的更改;

先启动nacos、再启动seata;(seata的启动在bin下的seata-server.bat)

可以看到seata已经作为一个模块注册进了nacos中;

2c49518fbd97488f9e1709de9152eff5.png

访问7091端口,可以看到seata的图形化界面(用户名和密码都是seata)

8c08816e56f74a7c90039ff99b2148b2.png

三、在spring cloud微服务中使用seata

版本:spring boot3.1.15、spring cloud2022、spring cloud alibaba2022.0.0.0-RC2、nacos2.2.2、seata2.0

注意版本的搭配是非常重要的!!!

版本发布说明 | Spring Cloud Alibaba

本次演示seata使用到三个模块、三个数据库、三张表;

一个简单的业务代码如下:

一个订单表下订单、然后订单模块调用库存模块减库存,库存模块调用用户模块扣余额,最终再修改订单的状态;

下订单 --->减库存---->扣余额---->修改(订单)状态

创建三个数据库order、storage、user;并在这三个数据库中创建相应的表单并插入数据;(需要特别说明的是,要使用seata做分布式事务管理,需要再每一个模块绑定的数据库中新建一个undo_log表,这是seata官方强制要求的。这张表用来进行日志的记录和数据的回滚)

建表语句如下:

  1. -- for AT mode you must to init this sql for you business database. the seata server not need it.
  2. CREATE TABLE IF NOT EXISTS `undo_log`
  3. (
  4. `branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
  5. `xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
  6. `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  7. `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
  8. `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
  9. `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
  10. `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
  11. UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
  12. ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
  13. ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

接下来编写相应的代码来实现seata的分布式事务(其实就一个)

1、在每个模块中引入seata的依赖:

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  4. </dependency>

2、将这三个模块都注册进nacos;加上seata-server应该是四个服务。如果有gateway网关就是5个服务

3、在OrderController中新建一个get方法,用来生成一个订单;

  1. @RestController
  2. public class OrderController {
  3. @Resource
  4. private OrderService orderService;
  5. /**
  6. * 创建订单
  7. */
  8. @GetMapping("/order/create")
  9. public ResultData create(Order order)
  10. {
  11. orderService.create(order);
  12. return ResultData.success(order);
  13. }
  14. }

在service实现类中编写相应的代码,(主要是@GlobalTransactional注解的使用)

  1. @Slf4j
  2. @Service
  3. public class OrderServiceImpl implements OrderService
  4. {
  5. @Resource
  6. private OrderMapper orderMapper;
  7. @Resource//订单微服务通过OpenFeign去调用库存微服务
  8. private StorageFeignApi storageFeignApi;
  9. @Resource//订单微服务通过OpenFeign去调用账户微服务
  10. private AccountFeignApi accountFeignApi;
  11. @Override
  12. @GlobalTransactional(name = "zzyy-create-order",rollbackFor = Exception.class) //AT
  13. //@GlobalTransactional @Transactional(rollbackFor = Exception.class) //XA
  14. public void create(Order order) {
  15. //xid检查
  16. String xid = RootContext.getXID();
  17. //1. 新建订单
  18. log.info("==================>开始新建订单"+"\t"+"xid_order:" +xid);
  19. //订单状态status0:创建中;1:已完结
  20. order.setStatus(0);
  21. int result = orderMapper.insertSelective(order);
  22. //插入订单成功后获得插入mysql的实体对象
  23. Order orderFromDB = null;
  24. if(result > 0)
  25. {
  26. orderFromDB = orderMapper.selectOne(order);
  27. //orderFromDB = orderMapper.selectByPrimaryKey(order.getId());
  28. log.info("-------> 新建订单成功,orderFromDB info: "+orderFromDB);
  29. System.out.println();
  30. //2. 扣减库存
  31. log.info("-------> 订单微服务开始调用Storage库存,做扣减count");
  32. storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());
  33. log.info("-------> 订单微服务结束调用Storage库存,做扣减完成");
  34. System.out.println();
  35. //3. 扣减账号余额
  36. log.info("-------> 订单微服务开始调用Account账号,做扣减money");
  37. accountFeignApi.decrease(orderFromDB.getUserId(), orderFromDB.getMoney());
  38. log.info("-------> 订单微服务结束调用Account账号,做扣减完成");
  39. System.out.println();
  40. //4. 修改订单状态
  41. //订单状态status0:创建中;1:已完结
  42. log.info("-------> 修改订单状态");
  43. orderFromDB.setStatus(1);
  44. Example whereCondition=new Example(Order.class);
  45. Example.Criteria criteria=whereCondition.createCriteria();
  46. criteria.andEqualTo("userId",orderFromDB.getUserId());
  47. criteria.andEqualTo("status",0);
  48. int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);
  49. log.info("-------> 修改订单状态完成"+"\t"+updateResult);
  50. log.info("-------> orderFromDB info: "+orderFromDB);
  51. }
  52. System.out.println();
  53. log.info("==================>结束新建订单"+"\t"+"xid_order:" +xid);
  54. }
  55. }

那两个模块中的内容都差不多,我就不在这粘了、浪费地方。通过OpenFeign来实现远程调用这两个模块

 一次监控中会生成一个唯一的xid,

String xid = RootContext.getXID();

如果一切都正常的话,当然是会生成一个订单数据,并且相应的库存和余额也会减少;

但如果出现故障的话,如果不加@GlobalTransactional注解,那么到哪一步出现故障,之前的数据库操作正常,后续的数据库就直接异常了。假如我们在最后一个user模块出现的异常,导致程序暂停,那么会出现生成一个状态为0的订单,并且响应的库存减少、但用户余额不变。这显然是一个重大的bug。但如果我们加上的@GlobalTransactional注解,那么本次操作出现异常的话,三张数据库都会回滚。

(注意seata分布式事务起作用时,会给所有参与的模块的数据库资源,也就是RM,加锁。在这期间是不允许修改响应的数据库的)

undo_log表是seata官方要求我们在每一个数据库中都要建立的一张表;它的作用是:
如果正常情况没有异常,那么正常提交;如果出现异常,那么undo_log表会记录上一次数据库的数据,以便于异常之后的回滚;

四、总结:

我们之前的步骤都是建立在seata的AT模式上;

AT 模式是 Seata 创新的一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。

整体机制

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:
    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿。

在一阶段,Seata 会拦截“业务 SQL”,

1  解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,

2  执行“业务 SQL”更新业务数据,在业务数据更新之后,

3  其保存成“after image”,最后生成行锁。

以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

1d9682c3d0764438b93d426675f8e726.png

二阶段分为两种情况:

1、正常提交:

二阶段如是顺利提交的话,

因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

81310ceb73094a0e8fd59a286b9f78c3.png

2、异常回滚:

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。

回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,

如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

a48275f6955343c99d6d217a9af2148a.png

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/846158
推荐阅读
相关标签
  

闽ICP备14008679号