当前位置:   article > 正文

分布式事务Seata的使用详解_seata使用

seata使用

 一、事务概述

事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作 都成功,要么所有的操作都被撤销。简单地说,事务提供一种“要么什么都不做,要么做全套”机制。

1.1.本地事物

本地事物其实可以认为是数据库提供的事务机制。说到数据库事务就不得不说,数据库事务中的四 大特性:

  • A:原子性(Atomicity),一个事务中的所有操作,要么全部完成,要么全部不完成

  • C:一致性(Consistency),在一个事务执行之前和执行之后数据库都必须处于一致性状态

  • I:隔离性(Isolation),在并发环境中,当不同的事务同时操作相同的数据时,事务之间互不影响

  • D:持久性(Durability),指的是只要事务成功结束,它对数据库所做的更新就必须永久的保存下来

数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单 元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚

1.2.分布式事务

分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布 式系统的不同节点之上。 简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同 的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。 本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

1.2.1.分布式事务的场景

  • 单体系统访问多个数据库:个数据库实例完成数据的增删改操作

  • 多个微服务访问同一个数据库:多个服务需要调用一个数据库实例完成数据的增删改操作

  • 多个微服务访问多个数据库:多个服务需要调用一个数据库实例完成数据的增删改操作

二、分布式事务解决方案

2.1.全局事务

 全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/Open Distributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种角色:

  • AP: Application 应用系统 (微服务)

  • TM: Transaction Manager 事务管理器 (全局事务管理)

  • RM: Resource Manager 资源管理器 (数据库)

整个事务分成两个阶段:

  • 阶段一: 表决阶段,所有参与者都将本事务执行预提交,并将能否成功的信息反馈发给协调者。

  • 阶段二: 执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。

优点

  • 提高了数据一致性的概率,实现成本较低

缺点

  • 单点问题: 事务协调者宕机

  • 同步阻塞: 延迟了提交时间,加长了资源阻塞时间

  • 数据不一致: 提交第二阶段,依然存在commit结果未知的情况,有可能导致数据不一致

2.2.可靠消息服务

基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个 系统,分别可以处理任务A和任务B。此时存在一个业务流程,需要将任务A和任务B在同一个事务中处理。就可以使用消息中间件来实现这种分布式事务。

 

第一步: 消息由系统A投递到中间件

  1. 在系统A处理任务A前,首先向消息中间件发送一条消息

  2. 消息中间件收到后将该条消息持久化,但并不投递。持久化成功后,向A回复一个确认应答

  3. 系统A收到确认应答后,则可以开始处理任务A

  4. 任务A处理完成后,向消息中间件发送Commit或者Rollback请求。该请求发送完成后,对系统A而 言,该事务的处理过程就结束了

  5. 如果消息中间件收到Commit,则向B系统投递消息;如果收到Rollback,则直接丢弃消息。但是 如果消息中间件收不到Commit和Rollback指令,那么就要依靠"超时询问机制"。

超时询问机制 系统A除了实现正常的业务流程外,还需提供一个事务询问的接口,供消息中间件调 用。当消息中间件收到发布消息便开始计时,如果到了超时没收到确认指令,就会主动调用 系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果,中间件根据三 种结果做出不同反应: 提交:将该消息投递给系统B 回滚:直接将条消息丢弃

处理中:继续等待

第二步: 消息由中间件投递到系统B 消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便立即进行任务的处理,任务 处理完成后便向消息中间件返回应答。

  • 如果消息中间件收到确认应答后便认为该事务处理完毕

  • 如果消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。

一般消息中间件可以设置消息重试的次数和时间间隔,如果最终还是不能成功投递,则需要手工干 预。这里之所以使用人工干预,而不是使用让A系统回滚,主要是考虑到整个系统设计的复杂度问 题。 基于可靠消息服务的分布式事务,前半部分使用异步,注重性能;后半部分使用同步,注重开发成本。

 2.3.最大努力通知

 最大努力通知也被称为定期校对,其实是对第二种解决方案的进一步优化。它引入了本地消息表来 记录错误消息,然后加入失败消息的定期校对功能,来进一步保证消息会被下游系统消费。

第一步: 消息由系统A投递到中间件

  1. 处理业务的同一事务中,向本地消息表中写入一条记录

  2. 准备专门的消息发送者不断地发送本地消息表中的消息到消息中间件,如果发送失败则重试

第二步: 消息由中间件投递到系统B

  1. 消息中间件收到消息后负责将该消息同步投递给相应的下游系统,并触发下游系统的任务执行

  2. 当下游系统处理成功后,向消息中间件反馈确认应答,消息中间件便可以将该条消息删除,从而该 事务完成

  3. 对于投递失败的消息,利用重试机制进行重试,对于重试失败的,写入错误消息表

  4. 消息中间件需要提供失败消息的查询接口,下游系统会定期查询失败消息,并将其消费

这种方式的优缺点:

  • 优点: 一种非常经典的实现,实现了最终一致性。

  • 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

2.4.TCC事务

TCC即为Try Confirm Cancel,它属于补偿型分布式事务。TCC实现分布式事务一共有三个步骤:

  • Try:尝试待执行的业务 这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源

  • Confirm:确认执行业务 确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC 则认为 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的 出错了,需引入重试机制或人工处理。

  • Cancel:取消待执行的业务 取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若 Cancel阶段真的出错了,需引入重试机制或人工处理。

TCC两阶段提交与XA两阶段提交的区别是: XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。 TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。 TCC事务的优缺点:

  • 优点:把数据库层的二阶段提交上提到了应用层来实现,规避了数据库层的2PC性能低下问题。

  • 缺点:TCC的Try、Confirm和Cancel操作功能需业务提供,开发成本高。

 

三、Seata介绍

2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们 遇到的分布式事务方面的所有难题。后来更名为 Seata,意为:Simple Extensible Autonomous Transaction Architecture,是一套分布式事务解决方案。 Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。 它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分 支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据 库的本地事务。

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。

Seata主要由三个重要组件组成:

  • TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交 和回滚。

  • TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。

  • RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分 支事务的状态,接受TC的命令来提交或者回滚分支事务。

用例说明:

用户购买商品的业务逻辑:

  • Storage服务:对给定的商品扣除仓储数量。

  • Order服务:根据采购需求创建订单。

  • Account服务:从用户帐户中扣除余额。

  • Business服务:创建订单的同时需完成对商品库存扣减及用户账号余额扣除操作。

 

Seata的执行流程如下:

  1. Business业务服务TM申请向TC开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID

  2. Storage服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖

  3. Storage服务执行分支事务,向数据库做操作,对给定的商品扣除仓储数量

  4. Order服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖

  5. Order服务执行分支事务,向数据做操作,创建订单

  6. Order服务开始远程调用Account服务,此时XID会在微服务的调用链上传播

  7. Account服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖

  8. Account服务执行分支事务,向数据库做操作,从用户账户中扣除余额

  9. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚

  10. TC协调其管辖之下的所有分支事务, 决定是否回滚

Seata实现2PC与传统2PC的差别:

  1. 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协 议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。

  2. 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保 持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2 持锁的时间,整体提高效率。

四、案例讲解Seata实现分布式事务

通过Seata中间件实现分布式事务,模拟电商中的下单和扣库存的过程 我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣除库存。

1.product微服务

IProductService接口

  1. public interface IProductService extends IService<Product> {
  2.   void updateStock(Integer pid,Integer num);
  3. }

ProductServiceImpl

  1. @Service
  2. public class ProductServiceImpl extends ServiceImpl<ProductMapper, Product> implements IProductService {
  3.   @Transactional
  4.   @Override
  5.   public void updateStock(Integer pid, Integer num) {
  6.       //根据商品pid获取商品信息
  7.       Product product = this.getById(pid);
  8.       //判断库存商品是否大于购物商品数量
  9.       if(product.getStock()>num){
  10.           product.setStock(product.getStock()-num);
  11.           //根据商品pid修改商品数量
  12.           this.updateById(product);
  13.       }else{
  14.           throw new RuntimeException("库存不足");
  15.       }
  16.   }
  17. }

ProductController

  1. @Controller
  2. public class ProductController {
  3.   @RequestMapping("/updateStock/{pid}/{num}")
  4.   @ResponseBody
  5.   public void updateStock(@PathVariable("pid") Integer pid,
  6.                           @PathVariable("num") Integer num){
  7.       productService.updateStock(pid,num);
  8.   }
  9. }

2.order微服务

order微服务启动类配置上配置@EnableFeignClients

ApiProductService

  1. @FeignClient("zmall-product")
  2. public interface ApiProductService {
  3. @RequestMapping("/updateStock/{pid}/{num}")
  4. void updateStock(@PathVariable("pid") Integer pid,
  5. @PathVariable("num") Integer num);
  6. }

IOrderService

  1. public interface IOrderService extends IService<Order> {
  2.   Order createOrder(Integer pid,Integer num);
  3. }

OrderServiceImpl

  1. @Service
  2. public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
  3.   @Autowired
  4.   private ApiProductService productService;
  5.   @Transactional
  6.   @Override
  7.   public Order createOrder(Integer pid, Integer num) {
  8.       //根据商品ID修改商品对应的库存
  9.       productService.updateStock(pid,num);
  10.       //新增订单
  11.       Order order=new Order();
  12.       //此处只是做模拟操作
  13.       this.save(order);
  14.       return order;
  15.   }
  16. }

OrderController

  1. @Controller
  2. public class OrderController {
  3.   @Autowired
  4.   private IOrderService orderService;
  5.   @RequestMapping("/createOrder/{pid}/{num}")
  6.   @ResponseBody
  7.   public Order createOrder(@PathVariable("pid") Integer pid,
  8.                             @PathVariable("num") Integer num){
  9.       return orderService.createOrder(pid,num);
  10.   }
  11. }

异常模拟

在OrderServiceImpl的代码中模拟一个异常

现在我们就来模拟测试一下商品数量修改完了但是订单生成失败了,现在这个香奈儿的数量是93

我们进行访问一下

商品的数量减少了但是没有生成订单

 原因就是刚刚所模拟的错误,所以这不符合我们的需求。

 

3.启动Seata

下载seata

下载地址:https://github.com/seata/seata/releases/v1.3.0/

修改配置文件及初始化

  • seata-server-1.4.0.zip

将下载得到的seata-server-1.4.0.zip(非源码包)压缩包进行解压,进入conf目录,修改file.conf

1.将mode=“type”修改成mode="db"

2.设置数据库名、用户账号及密码

 创建数据库Seata并初始化数据表

解压seata-1.4.0源码包,并进入到seata-1.4.0\script\server\db目录,复制运行mysql.sql脚本完成seata服务端数据库初始化工作。

修改registry.conf

registry:指定注册中心,将seata-server注册到指定位置

config: 指定配置中心  

 配置seata-1.4.0.zip(源码包)

修改seata-1.4.0中script/config-center目录中的config.txt配置:

初始化Seata配置到Nacos中

在seata-1.4.0\script\config-center\nacos目录中右键选择git bash here,运行git命令窗口。并输入以下命令

sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -u nacos -w nacos

执行成功后可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP 的配置。  

启动seata服务

直接进入seata服务的seata\bin目录下,双击运行seata-server.bat文件即可。或者使用以下命令方式运行

  1. cd bin
  2. seata-server.bat -p 9000 -m file
  3. seata-server.bat -h ip地址 -p 9000 -m file

-p 9000:指定监听端口,默认为8091

-m file: 模式

启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr 的服务。

 4.使用Seata实现事务控制

初始化数据表

进入源码包seata-1.4.0\script\client\at\db目录,复制并运行mysql.sql数据库脚本完成undo_log表创建,这是Seata记录事务日志要用到的表。

  1. CREATE TABLE `undo_log`
  2. (
  3. `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  4. `branch_id` BIGINT(20) NOT NULL,
  5. `xid` VARCHAR(100) NOT NULL,
  6. `context` VARCHAR(128) NOT NULL,
  7. `rollback_info` LONGBLOB NOT NULL,
  8. `log_status` INT(11) NOT NULL,
  9. `log_created` DATETIME NOT NULL,
  10. `log_modified` DATETIME NOT NULL,
  11. `ext` VARCHAR(100) DEFAULT NULL,
  12. PRIMARY KEY (`id`),
  13. UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
  14. ) ENGINE = INNODB
  15. AUTO_INCREMENT = 1
  16. DEFAULT CHARSET = utf8;
添加配置
  • 添加依赖

在订单模块和商品模块中分别引入以下依赖:

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  4. </dependency>
  5. <!-- 如果已经添加了nacos配置中心依赖,则可以不加入 -->
  6. <dependency>
  7. <groupId>com.alibaba.cloud</groupId>
  8. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  9. </dependency>
  • DataSourceProxyConfig

Seata 是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。请在订单模块中添加DruidDataSource配置类,具体如下所示:

  1. import com.alibaba.druid.pool.DruidDataSource;
  2. import io.seata.rm.datasource.DataSourceProxy;
  3. import io.seata.rm.datasource.xa.DataSourceProxyXA;
  4. import org.springframework.boot.context.properties.ConfigurationProperties;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.Primary;
  8. import javax.sql.DataSource;
  9. @Configuration
  10. public class DataSourceProxyConfig {
  11. @Bean
  12. @ConfigurationProperties(prefix = "spring.datasource")
  13. public DruidDataSource druidDataSource() {
  14. return new DruidDataSource();
  15. }
  16. @Primary
  17. @Bean("dataSourceProxy")
  18. public DataSource dataSource(DruidDataSource druidDataSource) {
  19. //AT模式
  20. return new DataSourceProxy(druidDataSource);
  21. //XA模式
  22. //return new DataSourceProxyXA(druidDataSource);
  23. }
  24. }

在启动类上排除DataSource数据源自动配置类  

  1. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
  2. @EnableFeignClients
  3. @EnableDiscoveryClient
  4. @MapperScan("org.csdn.order.mapper")
  5. public class OrderApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(OrderApplication.class, args);
  8. }
  9. }

 在需要进行分布式事务的微服务中进行下面几项配置:

  • application.yml

在需要进行分布式事务的各个微服务中的application.yml数据源更新成阿里巴巴的DruidDataSource

  1. spring:
  2. datasource:
  3.   #type连接池类型 DBCP,C3P0,Hikari,Druid,默认为Hikari
  4.   type: com.alibaba.druid.pool.DruidDataSource
  5.   driver-class-name: com.mysql.jdbc.Driver
  6.   url: jdbc:mysql://localhost:3306/zmall?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
  7.   username: root
  8.   password: 1234
  • registry.conf

在微服务模块中的resources目录下添加seata的配置文件 registry.conf,该配置文件来自于seata-server/conf目录下的配置文件。

  1. registry {
  2.   type = "nacos"
  3.   nacos {
  4.       serverAddr = "localhost"
  5.       namespace = "public"
  6.       cluster = "default"
  7.   }
  8. }
  9. config {
  10.   type = "nacos"
  11.   nacos {
  12.       serverAddr = "localhost"
  13.       namespace = "public"
  14.       cluster = "default"
  15.   }
  16. }
  • bootstrap.yaml

  1. spring:
  2. application:
  3.   name: service-product
  4. cloud:
  5.   nacos:
  6.     config:
  7.       server-addr: localhost:8848 # nacos的服务端地址
  8.       namespace: public
  9.       group: SEATA_GROUP
  10.   alibaba:
  11.     seata:
  12.       tx-service-group: my_test_tx_group

请注意spring.cloud.alibaba.seata.tx-service-group=my_test_txgroup配置。这里的my_test_tx_group名称必须与seata源码包中的config.txt配置文件中的名字一致。重要,重要,重要!!!

请打开seata源码包目录seata-1.4.0\script\config-center\下的config.txt,如下:

  • 依赖
  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bootstrap</artifactId>
  4. </dependency>
在order微服务开启全局事务
@GlobalTransactional  //seata全局事务控制

5.seata运行流程分析

要点说明: 1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连 接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务 操作就一定有undo_log。 2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成 就已经将分支事务提交,也就释放了锁资源。 3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支 事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。 4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事 务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。 5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应 的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失 败则会重试回滚操作。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号