当前位置:   article > 正文

SpringCloud 整合Seata 解决分布式事务_spring cloud+seata处理分布式事务

spring cloud+seata处理分布式事务

seata官网:http://seata.io/zh-cn/

前言

在当下微服务架构比较火热时,新一代微服务解决方案Spring Cloud Alibaba提供的开源分布式事务解决框架Seata无疑成为了我们在解决分布式事务时的首要之选,前面两篇文章分别介绍了常见的分布式解决方案和成熟的框架以及关于Seata概念的入门介绍,没有过分布式事务处理的小伙伴可以先有个大致的入门了解:

  • SpringCloud Alibaba微服务架构(十一)- 常见分布式事务解决方案及理论基础篇
  • SpringCloud Alibaba微服务架构(十二)- 分布式事务解决框架之Seata概念入门篇

那么在本篇Spring Cloud整合Seata之前,你必须要了解一下Spring Cloud Alibaba与Spring Boot、Spring Cloud之间的版本对应关系。

版本选择: Spring Cloud Alibaba与Spring Boot、Spring Cloud版本对应关系

一、版本要求

坑点1: 如果项目中使用了druid数据库连接池,引入的是SpringBoot的Starter依赖druid-spring-boot-starter,那么需要把druid-spring-boot-starter依赖换成druid1.1.23,因为seata源码中引入的druid依赖跟druid-spring-boot-starter的自动装配类冲突了,冲突的情况下项目启动出现异常,异常如下:

二、整合Seata环境配置

1. 下载seata-server-1.2.0和seata-1.2.0源码

seate-server下载: https://seata.io/zh-cn/blog/download.html,下载我们需要使用的seata1.2压缩包。

seata-1.2.0源码下载: https://github.com/seata/seata/releases

在这里插入图片描述

2. 创建undo_log日志表

在seata1.2源码seata-1.2.0\script\client\at\db目录下有提供针对mysql、oracle、postgresql这三种数据库生成undo-log逆向日志回滚表的表创建脚本。

  • 在你项目的参与全局事务的数据库中加入undo_log这张表。undo_log表脚本根据自身数据库类型来选择。
  1. - for AT mode you must to init this sql for you business database. the seata server not need it. 
  2. CREATE TABLE IF NOT EXISTS `undo_log` 
  3.     `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id'
  4.     `xid`           VARCHAR(100NOT NULL COMMENT 'global transaction id'
  5.     `context`       VARCHAR(128NOT NULL COMMENT 'undo_log context,such as serialization'
  6.     `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info'
  7.     `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status'
  8.     `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime'
  9.     `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime'
  10.     UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) 
  11. ) ENGINE = InnoDB 
  12.   AUTO_INCREMENT = 1 
  13.   DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table'

3.创建seata事务相关表

下载Seata1.2的源码后解压如上图,目前支持mysql、oracle、postgresql这三种数据库,上述三种脚本是针对Seata的Sever端在协调处理分布式事务时所需要的3张表,提供了不同数据库的global_table表、branch_table表、lock_table表创建脚本,根据自身数据库执行对应的sql脚本执行即可。

这里以mysql为例,在你的mysql数据库中创建名为seata的库,并执行以下sql,将会生成三张表:

  1. -- -------------------------------- The script used when storeMode is 'db' -------------------------------- 
  2. -- the table to store GlobalSession data 
  3. CREATE TABLE IF NOT EXISTS `global_table` 
  4.     `xid`                       VARCHAR(128NOT NULL
  5.     `transaction_id`            BIGINT
  6.     `status`                    TINYINT      NOT NULL
  7.     `application_id`            VARCHAR(32), 
  8.     `transaction_service_group` VARCHAR(32), 
  9.     `transaction_name`          VARCHAR(128), 
  10.     `timeout`                   INT
  11.     `begin_time`                BIGINT
  12.     `application_data`          VARCHAR(2000), 
  13.     `gmt_create`                DATETIME, 
  14.     `gmt_modified`              DATETIME, 
  15.     PRIMARY KEY (`xid`), 
  16.     KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), 
  17.     KEY `idx_transaction_id` (`transaction_id`) 
  18. ) ENGINE = InnoDB 
  19.   DEFAULT CHARSET = utf8; 
  20. -- the table to store BranchSession data 
  21. CREATE TABLE IF NOT EXISTS `branch_table` 
  22.     `branch_id`         BIGINT       NOT NULL
  23.     `xid`               VARCHAR(128NOT NULL
  24.     `transaction_id`    BIGINT
  25.     `resource_group_id` VARCHAR(32), 
  26.     `resource_id`       VARCHAR(256), 
  27.     `branch_type`       VARCHAR(8), 
  28.     `status`            TINYINT, 
  29.     `client_id`         VARCHAR(64), 
  30.     `application_data`  VARCHAR(2000), 
  31.     `gmt_create`        DATETIME(6), 
  32.     `gmt_modified`      DATETIME(6), 
  33.     PRIMARY KEY (`branch_id`), 
  34.     KEY `idx_xid` (`xid`) 
  35. ) ENGINE = InnoDB 
  36.   DEFAULT CHARSET = utf8; 
  37. -- the table to store lock data 
  38. CREATE TABLE IF NOT EXISTS `lock_table` 
  39.     `row_key`        VARCHAR(128NOT NULL
  40.     `xid`            VARCHAR(96), 
  41.     `transaction_id` BIGINT
  42.     `branch_id`      BIGINT       NOT NULL
  43.     `resource_id`    VARCHAR(256), 
  44.     `table_name`     VARCHAR(32), 
  45.     `pk`             VARCHAR(36), 
  46.     `gmt_create`     DATETIME, 
  47.     `gmt_modified`   DATETIME, 
  48.     PRIMARY KEY (`row_key`), 
  49.     KEY `idx_branch_id` (`branch_id`) 
  50. ) ENGINE = InnoDB 
  51.   DEFAULT CHARSET = utf8; 

4. 项目中引入seata依赖

4.1 如果微服务是SpringCloud

  1. <!-- 分布式事务seata包 --> 
  2. <!--seata begin--> 
  3. <dependency> 
  4.    <groupId>com.alibaba.cloud</groupId> 
  5.    <artifactId>spring-cloud-starter-alibaba-seata</artifactId> 
  6.    <version>2.1.3.RELEASE</version> 
  7.    <exclusions> 
  8.      <exclusion> 
  9.         <groupId>io.seata</groupId> 
  10.         <artifactId>seata-spring-boot-starter</artifactId> 
  11.      </exclusion>    
  12.    </exclusions> 
  13. </dependency> 
  14. <dependency> 
  15.     <groupId>io.seata</groupId> 
  16.     <artifactId>seata-spring-boot-starter</artifactId> 
  17.     <version>1.2.0</version> 
  18. </dependency> 
  19. <!--seata end--> 

4.2 如果微服务是Dubbo

  1. <dependency> 
  2.     <groupId>io.seata</groupId> 
  3.     <artifactId>seata-spring-boot-starter</artifactId> 
  4.     <version>1.2.0</version> 
  5. </dependency> 

5. 更改seata-server中的registry.conf

配置registry.conf注册中心为nacos,配置nacos相关属性参数。

  1. ##配置seata-server的注册中心,支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 
  2. registry { 
  3.   # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 
  4.   type = "nacos" 
  5.   nacos { 
  6.     application = "seata-server" 
  7.     serverAddr = "127.0.0.1:8848" 
  8.     group = "SEATA_GROUP" 
  9.     namespace = "public" 
  10.     username = "nacos" 
  11.     cluster = "default" 
  12.     password = "nacos" 
  13.   } 
  14.    
  15.   file { 
  16.     name = "file.conf" 
  17.   } 
  18. ##配置seata-server的配置中心,支持file、nacos 、apollo、zk、consul、etcd3 
  19. config { 
  20.   # file、nacos 、apollo、zk、consul、etcd3 
  21.   type = "nacos" 
  22.   nacos { 
  23.     serverAddr = "127.0.0.1:8848" 
  24.     namespace = "public" 
  25.     group = "SEATA_GROUP" 
  26.     username = "nacos" 
  27.     password = "nacos" 
  28.   } 
  29.   
  30.   file { 
  31.     name = "file.conf" 
  32.   } 

6. 修改seata-server中的file.config

配置file.config的DB模式相关参数配置。

  1. ##配置seata-server的数据存储方式,支持本地文档和数据库。 
  2. ## transaction log store, only used in seata-server 
  3. store { 
  4.   ## store mode: file、db、redis 
  5.   mode = "db" 
  6.   ## file store property 
  7.   file { 
  8.     ## store location dir 
  9.     dir = "sessionStore" 
  10.     # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions 
  11.     maxBranchSessionSize = 16384 
  12.     # globe session size , if exceeded throws exceptions 
  13.     maxGlobalSessionSize = 512 
  14.     # file buffer size , if exceeded allocate new buffer 
  15.     fileWriteBufferCacheSize = 16384 
  16.     # when recover batch read size 
  17.     sessionReloadReadSize = 100 
  18.     # async, sync 
  19.     flushDiskMode = async 
  20.   } 
  21.   ## database store property 
  22.   db { 
  23.     ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. 
  24.     datasource = "druid" 
  25.     ## mysql/oracle/postgresql/h2/oceanbase etc. 
  26.     dbType = "mysql" 
  27.     driverClassName = "com.mysql.jdbc.Driver" 
  28.     url = "jdbc:mysql://127.0.0.1:3306/seata" 
  29.     user = "root" 
  30.     password = "root" 
  31.     minConn = 5 
  32.     maxConn = 30 
  33.     globalTable = "global_table" 
  34.     branchTable = "branch_table" 
  35.     lockTable = "lock_table" 
  36.     queryLimit = 100 
  37.     maxWait = 5000 
  38.   } 
  39.   ## redis store property 
  40.   redis { 
  41.     host = "127.0.0.1" 
  42.     port = "6379" 
  43.     password = "" 
  44.     database = "0" 
  45.     minConn = 1 
  46.     maxConn = 10 
  47.     queryLimit = 100 
  48.   } 

7. 修改提交nacos脚本到nacos控制台

运行你下载的nacos,并参考:https://github.com/seata/seata/tree/develop/script/config-center 下的config.txt文件并修改:

  1. service.vgroupMapping.my_test_tx_group=default 
  2. store.mode=db 
  3. store.db.datasource=druid 
  4. store.db.dbType=mysql 
  5. store.db.driverClassName=com.mysql.jdbc.Driver 
  6. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true 
  7. store.db.user=username 
  8. store.db.password=password 
  9. store.db.minConn=5 
  10. store.db.maxConn=30 
  11. store.db.globalTable=global_table 
  12. store.db.branchTable=branch_table 
  13. store.db.queryLimit=100 
  14. store.db.lockTable=lock_table 
  15. store.db.maxWait=5000 

运行仓库:https://github.com/seata/seata/tree/develop/script/config-center/nacos 中提供的nacos脚本nacos-config.sh,将以上信息提交到nacos控制台,如果有需要修改参数,可直接通过登录nacos控制台修改。

操作如下图:

8. application.yml配置

从官方github仓库:https://github.com/seata/seata/tree/develop/script/client 拿到参考配置做修改,加到你项目的application.yml文件中。

  1. #Seata分布式事务配置(AT模式) 
  2. seata: 
  3.   enabled: true 
  4.   application-id: ${spring.application.name} 
  5.   #客户端和服务端在同一个事务组 
  6.   tx-service-group: my_test_tx_group 
  7.   enable-auto-data-source-proxy: true 
  8.   service: 
  9.     vgroup-mapping: 
  10.       my_test_tx_group: default 
  11.   config: 
  12.     type: nacos 
  13.     nacos: 
  14.       namespace"public" 
  15.       serverAddr: 127.0.0.1:8848 
  16.       group: SEATA_GROUP 
  17.       username: "nacos" 
  18.       password: "nacos" 
  19.   #服务注册到nacos 
  20.   registry: 
  21.     type: nacos 
  22.     nacos: 
  23.       application: seata-server 
  24.       server-addr: 127.0.0.1:8848 
  25.       group: SEATA_GROUP 
  26.       namespace"public" 
  27.       username: "nacos" 
  28.       password: "nacos" 
  29.       cluster: default 
 

9. 运行seata-server

启动运行seata-server,成功后,运行自己的服务提供者,服务参与者。在全局事务调用者(发起全局事务的服务)的接口上加入@GlobalTransactional注解

到此为止,整合SpringCloud整合seata1.2及seata1.2整合nacos的配置与注册中心全部整合完成了。

三、项目准备

如果你经过前面的步骤搭建Seata环境完成了,那么你可以尝试一下启动项目,控制台无异常则搭建成功。

那么下面准备以Seata官方文档上的一个经典例子为题,模拟用户下单,创建订单同时扣减库存数量这一过程中产生的分布式事务问题,然后使用Seata解决,正好使用以下Seata的特性。

1. 订单服务

  • OrderController
  1. /** 
  2.  * @desc:  订单服务 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:27 
  5.  */ 
  6. @RestController 
  7. @Slf4j 
  8. @RequestMapping("/order") 
  9. public class OrderController { 
  10.     @Autowired 
  11.     private OrderServiceImpl orderService; 
  12.     /** 
  13.      * 用户购买下单,模拟全局事务提交 
  14.      * @param pid 
  15.      * @return 
  16.      */ 
  17.     @RequestMapping("/purchase/commit/{pid}") 
  18.     public Order orderCommit(@PathVariable("pid") Integer pid) { 
  19.         return orderService.createOrderCommit(pid); 
  20.     } 
  21.     /** 
  22.      * 用户购买下单,模拟全局事务回滚 
  23.      * @param pid 
  24.      * @return 
  25.      */ 
  26.     @RequestMapping("/purchase/rollback/{pid}") 
  27.     public Order orderRollback(@PathVariable("pid") Integer pid) { 
  28.         return orderService.createOrderRollback(pid); 
  29.     } 
  • OrderServiceImpl
  1. /** 
  2.  * @desc
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:30 
  5.  */ 
  6. @Service 
  7. @Slf4j 
  8. public class OrderServiceImpl { 
  9.     @Autowired 
  10.     private OrderDao orderDao; 
  11.     @Autowired 
  12.     private ProductService productService; 
  13.     //用户下单,模拟全局事务提交 
  14.     public Order createOrderCommit(Integer pid) { 
  15.         log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid); 
  16.         //1 调用商品微服务,查询商品信息 
  17.         Product product = productService.findByPid(pid); 
  18.         log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product)); 
  19.         //2 下单(创建订单) 
  20.         Order order = new Order(); 
  21.         order.setUid(1); 
  22.         order.setUsername("测试用户"); 
  23.         order.setPid(pid); 
  24.         order.setPname(product.getPname()); 
  25.         order.setPprice(product.getPprice()); 
  26.         order.setNumber(1); 
  27.         orderDao.save(order); 
  28.         log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order)); 
  29.         //3 扣库存m 
  30.         productService.reduceInventoryCommit(pid, order.getNumber()); 
  31.         return order; 
  32.     } 
  33.     //用户下单,模拟全局事务回滚 
  34.     @GlobalTransactional//全局事务控制 
  35.     public Order createOrderRollback(Integer pid) { 
  36.         log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid); 
  37.         //1 调用商品微服务,查询商品信息 
  38.         Product product = productService.findByPid(pid); 
  39.         log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product)); 
  40.         //2 下单(创建订单) 
  41.         Order order = new Order(); 
  42.         order.setUid(1); 
  43.         order.setUsername("测试用户"); 
  44.         order.setPid(pid); 
  45.         order.setPname(product.getPname()); 
  46.         order.setPprice(product.getPprice()); 
  47.         order.setNumber(1); 
  48.         orderDao.save(order); 
  49.         log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order)); 
  50.         //3 扣库存m 
  51.         productService.reduceInventoryRollback(pid, order.getNumber()); 
  52.         return order; 
  53.     } 
  • 商品服务的Feign类ProductService
  1. /** 
  2.  * @desc
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:43 
  5.  */ 
  6. @FeignClient(value = "product-service",configuration = FeignRequestInterceptor.class) 
  7. public interface ProductService { 
  8.     //@FeignClient的value +  @RequestMapping的value值  其实就是完成的请求地址  "http://product-service/product/" + pid 
  9.     //指定请求的URI部分 
  10.     @RequestMapping("/product/product/{pid}") 
  11.     Product findByPid(@PathVariable Integer pid)
  12.     //扣减库存,模拟全局事务提交 
  13.     //参数一: 商品标识 
  14.     //参数二:扣减数量 
  15.     @RequestMapping("/product/reduceInventory/commit") 
  16.     void reduceInventoryCommit(@RequestParam("pid") Integer pid, 
  17.                                @RequestParam("number") Integer number); 
  18.     //扣减库存,模拟全局事务回滚 
  19.     //参数一: 商品标识 
  20.     //参数二:扣减数量 
  21.     @RequestMapping("/product/reduceInventory/rollback") 
  22.     void reduceInventoryRollback(@RequestParam("pid") Integer pid, 
  23.                          @RequestParam("number") Integer number); 

2. 商品服务

  • ProductController
  1. /** 
  2.  * @desc
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:16 
  5.  */ 
  6. @RestController 
  7. @Slf4j 
  8. @RequestMapping("/product") 
  9. public class ProductController { 
  10.     @Autowired 
  11.     private ProductService productService; 
  12.     /** 
  13.      * 扣减库存,正常->模拟全局事务提交 
  14.      * @param pid 
  15.      * @param number 
  16.      */ 
  17.     @RequestMapping("/reduceInventory/commit") 
  18.     public void reduceInventoryCommit(Integer pid, Integer number) { 
  19.         String token = ServletUtils.getRequest().getHeader("token"); 
  20.         log.info("从head请求头透传过来的值为token:"+ token); 
  21.         productService.reduceInventoryCommit(pid, number); 
  22.     } 
  23.     /** 
  24.      * 扣减库存,异常->模拟全局事务回滚 
  25.      * @param pid 
  26.      * @param number 
  27.      */ 
  28.     @RequestMapping("/reduceInventory/rollback") 
  29.     public void reduceInventoryRollback(Integer pid, Integer number) { 
  30.         productService.reduceInventoryRollback(pid, number); 
  31.     } 
  32.     //商品信息查询 
  33.     @RequestMapping("/product/{pid}") 
  34.     public Product product(@PathVariable("pid") Integer pid) { 
  35.         log.info("接下来要进行{}号商品信息的查询", pid); 
  36.         Product product = productService.findByPid(pid); 
  37.         log.info("商品信息查询成功,内容为{}", JSON.toJSONString(product)); 
  38.         return product; 
  39.     } 
  • ProductService接口类
  1. /** 
  2.  * @desc:  商品接口 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:18 
  5.  */ 
  6. public interface ProductService { 
  7.     //根据pid查询商品信息 
  8.     Product findByPid(Integer pid)
  9.     //扣减库存,正常->模拟全局事务提交 
  10.     void reduceInventoryCommit(Integer pid, Integer number)
  11.     //扣减库存,异常->模拟全局事务回滚 
  12.     void reduceInventoryRollback(Integer pid, Integer number)
  • ProductServiceImpl 接口实现类
  1. /** 
  2.  * @desc:  商品服务实现类 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:20 
  5.  */ 
  6. @Service 
  7. public class ProductServiceImpl implements ProductService { 
  8.     @Autowired 
  9.     private ProductDao productDao; 
  10.     @Override 
  11.     public Product findByPid(Integer pid) { 
  12.         return productDao.findById(pid).get(); 
  13.     } 
  14.     /** 
  15.      * 扣减库存,正常->模拟全局事务提交 
  16.      * @param pid 
  17.      * @param number 
  18.      */ 
  19.     @Override 
  20.     public void reduceInventoryCommit(Integer pid, Integer number) { 
  21.         //查询 
  22.         Product product = productDao.findById(pid).get(); 
  23.         //省略校验 
  24.         //内存中扣减 
  25.         product.setStock(product.getStock() - number); 
  26.         //保存扣减库存 
  27.         productDao.save(product); 
  28.     } 
  29.     /** 
  30.      * 扣减库存,异常->模拟全局事务回滚 
  31.      * @param pid 
  32.      * @param number 
  33.      */ 
  34.     @Transactional(rollbackFor = Exception.class)  //服务提供方本地事务注解 
  35.     @Override 
  36.     public void reduceInventoryRollback(Integer pid, Integer number) { 
  37.         //查询 
  38.         Product product = productDao.findById(pid).get(); 
  39.         //省略校验 
  40.         //内存中扣减 
  41.         product.setStock(product.getStock() - number); 
  42.         //模拟异常 
  43.         int i = 1 / 0
  44.         //保存扣减库存 
  45.         productDao.save(product); 
  46.     } 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/165669
推荐阅读
相关标签
  

闽ICP备14008679号