赞
踩
推荐: 分布式事务( 图解 + 秒懂 + 史上最全 )
在单机单库的时候, 有本地事务可以保证数据的一致性; 但在分布式架构中, 原来的单体应用被拆分成多个微服务应用且都有各自独立的数据源, 一个业务场景的完成可能需要不同微服务模块和不同的库来完成. 此时每个微服务模块内部的数据一致性由本地事务来保证, 但是全局的数据一致性问题没有办法协调保证.
比如: 用户购买商品的业务逻辑, 需要由3个微服务提供支持.
架构图:
一句话: 一次业务操作需要跨多个数据源或者跨多个系统进行远程调用, 就会产生分布式事务问题.
Seata是一款开源的分布式事务解决方案, 致力于在微服务架构下提供高性能和简单易用的分布式事务服务.
→Seata官网
一个典型的分布式事务过程: 全局事务ID + 三组件模型
模型图:
处理过程:
→Seata官网下载
→Seata在GitHub的发布说明
→GitHub下载Seata
以前本地事务我们用Spring提供的注解 @Transactionnal 开启.
Seata也给我们提供了开启全局事务的注解 @GlobalTransactional.
我们只需要在业务方法上使用一个@GlobalTransactional注解.
我下载的版本是 seata-server-0.9.0 , 大家可以选择更新的版本.
将seata-server-0.9.0.zip解压到指定目录并修改conf目录下的file.conf
配置文件.(先备份file.conf文件后再修改)
修改内容: 自定义事务组名称 + 事务日志存储模式为db + 数据库连接信息.
在service模块中修改自定义事务组名称
service {
#vgroup->rgroup
# 自定义事务组名称 fsp_tx_group, 默认default
vgroup_mapping.my_test_tx_group = "fsp_tx_group"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
store模块修改事务日志存储模式为db
, 并修改数据库连接信息.
store { ## store mode: file、db # 事务日志存储模式 mode = "db" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store 数据库连接信息 db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata" user = "root" password = "root" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } }
在mysql中创建seata库, 然后分别创建全局事务表global_table, 分支事务表branch_table和事务锁表lock_table. 这三张表不用自己设计, 在下载的seata包中的conf目录下有一个db_store.sql
文件, 到mysql客户端中执行即可.
-- the table to store GlobalSession data drop table if exists `global_table`; create table `global_table` ( `xid` varchar(128) not null, `transaction_id` bigint, `status` tinyint not null, `application_id` varchar(32), `transaction_service_group` varchar(32), `transaction_name` varchar(128), `timeout` int, `begin_time` bigint, `application_data` varchar(2000), `gmt_create` datetime, `gmt_modified` datetime, primary key (`xid`), key `idx_gmt_modified_status` (`gmt_modified`, `status`), key `idx_transaction_id` (`transaction_id`) ); -- the table to store BranchSession data drop table if exists `branch_table`; create table `branch_table` ( `branch_id` bigint not null, `xid` varchar(128) not null, `transaction_id` bigint , `resource_group_id` varchar(32), `resource_id` varchar(256) , `lock_key` varchar(128) , `branch_type` varchar(8) , `status` tinyint, `client_id` varchar(64), `application_data` varchar(2000), `gmt_create` datetime, `gmt_modified` datetime, primary key (`branch_id`), key `idx_xid` (`xid`) ); -- the table to store lock data drop table if exists `lock_table`; create table `lock_table` ( `row_key` varchar(128) not null, `xid` varchar(96), `transaction_id` long , `branch_id` long, `resource_id` varchar(256) , `table_name` varchar(32) , `pk` varchar(36) , `gmt_create` datetime , `gmt_modified` datetime, primary key(`row_key`) );
修改conf目录下的registry.conf配置文件(先备份), 修改注册中心配置. 注册中心支持nacos 、eureka、redis、zk、consuld等.
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# 注册中心修改为nacos
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
............ 此处省略完整配置
}
先启动nacos, 再启动seata-server, 否则报 no available server to connect.
/nacos/bin/startup.cmd
/seata-0.9.0/bin/seata-server.bat
分布式事务说明, 我们这里需要创建三个服务, 订单服务, 库存服务, 账户服务.
当用户下订单时, 订单服务会创建一个订单, 然后通过远程调用库存服务来扣减下单商品的库存, 再通过远程调用账户服务来扣减账户余额, 最后在订单服务中修改订单状态为已完成. 该操作跨三个数据库, 有两次远程调用, 很明显会有分布式事务问题.
场景: 下订单 -> 扣减库存 -> 扣减账户余额
三个数据库: 订单数据库(seata_order), 库存数据库(seata_storage), 账户数据库(seata_account).
业务数据库SQL
CREATE DATABASE seata_order;
CREATE DATABASE seata_storage;
CREATE DATABASE seata_account;
在seata_order库中创建订单表 t_order.
CREATE TABLE t_order (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL COMMENT '数量',
`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
`status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
在seata_storage库中创建库存表 t_storage
CREATE TABLE t_storage (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
-- 初始库存
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');
在seata_account库中创建账户表 t_account
CREATE TABLE t_account (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
-- 初始化账户信息
INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
在上面三个业务数据库seata_order, seata_storage, seata_account中分别创建各自的事务回滚日志表. 执行seata包中conf目录下的db_undo_log.sql
文件.
-- the table to store seata xid data -- 0.7.0+ add context -- you must to init this sql for you business databese. the seata server not need it. -- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库) -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log drop table `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
各模块的数据库表准备完成后, 看一下结构.
业务需求, 下订单 -> 减库存 -> 扣余额 -> 改订单状态
创建订单模块 seata-order-service-2001
, pom依赖文件.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>atguigu-cloud-2020</artifactId> <groupId>com.atguigu.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>seata-order-service-2001</artifactId> <dependencies> <!--自定义公共依赖--> <dependency> <groupId>com.atguigu.springcloud</groupId> <artifactId>cloud-api-commons</artifactId> <version>${project.version}</version> </dependency> <!--nacos--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!--seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <exclusion> <artifactId>seata-all</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>0.9.0</version> </dependency> <!--feign--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!--web-actuator--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--mysql-druid--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> </project>
yml配置
server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: #自定义事务组名称需要与seata-server中的对应 tx-service-group: fsp_tx_group nacos: discovery: server-addr: localhost:8848 datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order?serverTimezone=Asia/Shanghai username: root password: root feign: hystrix: enabled: false logging: level: io: seata: info # 自定义属性 mybatis: mapperLocations: classpath:mapper/*.xml
将file.conf和registry.conf复制到项目类路径下.
然后修改一下file.conf文件的事务组名称和seata-server中配置的保持一致.
service {
#vgroup->rgroup
#修改自定义事务组名称, 我难得改,就用默认的default. 映射到seata-server的fsp_tx_group
vgroup_mapping.fsp_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
编写Order订单实体类
/** * 类描述:订单 */ @NoArgsConstructor @AllArgsConstructor @Data @Builder public class Order { private Long id; private Long userId; private Long productId; private Integer count; private BigDecimal money; /** * 订单状态:0:创建中;1:已完结 */ private Integer status; }
创建OrderDao接口, 使用@Mapper注解标注.
@Mapper
public interface OrderDao {
/**
* 创建订单
*/
void create(Order order);
/**
* 修改订单状态
*/
void update(@Param("userId") Long userId, @Param("status") Integer status);
}
创建OrderDao接口的映射文件, OrderMapper.xml.
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.atguigu.springcloud.dao.OrderDao"> <resultMap id="BaseResultMap" type="com.atguigu.springcloud.domain.Order"> <id column="id" property="id" jdbcType="BIGINT"/> <result column="user_id" property="userId" jdbcType="BIGINT"/> <result column="product_id" property="productId" jdbcType="BIGINT"/> <result column="count" property="count" jdbcType="INTEGER"/> <result column="money" property="money" jdbcType="DECIMAL"/> <result column="status" property="status" jdbcType="INTEGER"/> </resultMap> <insert id="create" parameterType="com.atguigu.springcloud.domain.Order"> INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`) VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0); </insert> <update id="update"> UPDATE `t_order` SET status = 1 WHERE user_id = #{userId} AND status = #{status}; </update> </mapper>
创建Service及实现
public interface OrderService {
/**
* 创建订单
*/
void create(Order order);
}
/**
* 类描述:远程调用库存服务扣减库存操作
*/
@FeignClient(value = "seata-storage-service")
public interface StorageService {
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
/**
* 类描述:远程调用账户服务扣减余额
*/
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PostMapping(value = "/account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
/** * 类描述:订单服务业务逻辑类 */ @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; /** * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态 * 简单说: * 下订单->减库存->减余额->改状态 */ @Override // @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class) public void create(Order order) { log.info("------->下单开始"); //本应用创建订单 orderDao.create(order); //远程调用库存服务扣减库存 log.info("------->order-service中扣减库存开始"); storageService.decrease(order.getProductId(), order.getCount()); log.info("------->order-service中扣减库存结束"); //远程调用账户服务扣减余额 log.info("------->order-service中扣减余额开始"); accountService.decrease(order.getUserId(), order.getMoney()); log.info("------->order-service中扣减余额结束"); //修改订单状态为已完成 log.info("------->order-service中修改订单状态开始"); orderDao.update(order.getProductId(), 0); log.info("------->order-service中修改订单状态结束"); log.info("------->下单结束"); } }
编写controller
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
/**
* 创建订单
*/
@GetMapping("/order/create")
public CommonResult create(Order order) {
orderService.create(order);
return new CommonResult(200, "订单创建成功!");
}
}
MybatisConfig配置扫描mapper接口的包.
@Configuration
@MapperScan(value = "com.atguigu.springcloud.dao")
public class MyBatisConfig {
}
配置代理数据源
/** * 类描述: 使用Seata对数据源进行代理 * io.seata.rm.datasource.DataSourceProxy */ @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapper-locations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource() { return new DruidDataSource(); } @Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } }
创建主启动类
// 排除自动配置数据源,使用自定义的数据源
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderMainApp2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMainApp2001.class);
}
}
按照上面一样的步骤创建库存微服务模块 seata-storage-service-2002
不一样的单独拿出来.
库存实体类Storage
@AllArgsConstructor @NoArgsConstructor @Data @Builder public class Storage { private Long id; /** * 产品id */ private Long productId; /** * 总库存 */ private Integer total; /** * 已用库存 */ private Integer used; /** * 剩余库存 */ private Integer residue; }
扣减库存逻辑. StorageController
@RestController
public class StorageController {
@Autowired
private StorageService storageService;
/**
* 扣减库存
*/
@RequestMapping("/storage/decrease")
public CommonResult decrease(Long productId, Integer count) {
storageService.decrease(productId, count);
return new CommonResult(200, "扣减库存成功!");
}
}
StorageServiceImpl
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Resource
private StorageDao storageDao;
/**
* 扣减库存
*/
@Override
public void decrease(Long productId, Integer count) {
log.info("------->storage-service中扣减库存开始");
storageDao.decrease(productId, count);
log.info("------->storage-service中扣减库存结束");
}
}
StorageDao
@Mapper
public interface StorageDao {
/**
* 扣减库存
*/
void decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
StorageMapper.xml映射文件
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.atguigu.springcloud.dao.StorageDao"> <resultMap id="BaseResultMap" type="com.atguigu.springcloud.domain.Storage"> <id column="id" property="id" jdbcType="BIGINT"/> <result column="product_id" property="productId" jdbcType="BIGINT"/> <result column="total" property="total" jdbcType="INTEGER"/> <result column="used" property="used" jdbcType="INTEGER"/> <result column="residue" property="residue" jdbcType="INTEGER"/> </resultMap> <update id="decrease"> UPDATE t_storage SET used = used + #{count}, residue = residue - #{count} WHERE product_id = #{productId} </update> </mapper>
一样的步骤创建账户微服务模块 seata-account-service-2003
扣减账户余额的逻辑单独拿出来. AccountController
@RestController
public class AccountController {
@Resource
private AccountService accountService;
/**
* 扣减账户余额
*/
@RequestMapping("/account/decrease")
public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
accountService.decrease(userId, money);
return new CommonResult(200, "扣减账户余额成功!");
}
}
AccountServiceImpl
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Resource
private AccountDao accountDao;
/**
* 扣减账户余额
*/
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("------->account-service中扣减账户余额开始");
accountDao.decrease(userId, money);
log.info("------->account-service中扣减账户余额结束");
}
}
AccountDao
@Mapper
public interface AccountDao {
/**
* 扣减账户余额
*/
void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
AccountMapper.xml映射文件
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.atguigu.springcloud.dao.AccountDao"> <resultMap id="BaseResultMap" type="com.atguigu.springcloud.domain.Account"> <id column="id" property="id" jdbcType="BIGINT"/> <result column="user_id" property="userId" jdbcType="BIGINT"/> <result column="total" property="total" jdbcType="DECIMAL"/> <result column="used" property="used" jdbcType="DECIMAL"/> <result column="residue" property="residue" jdbcType="DECIMAL"/> </resultMap> <update id="decrease"> UPDATE t_account SET residue = residue - #{money},used = used + #{money} WHERE user_id = #{userId}; </update> </mapper>
首先查询数据库的数据初始状态.
SELECT * FROM seata_order.t_order; 初始没有订单
SELECT * FROM seata_storage.t_storage; 库存100, 使用0, 剩余100.
SELECT * FROM seata_account.t_account; 余额1000, 使用0, 剩余1000.
首先, 启动Nacos, 启动Seata-server, 启动各位服务模块.
测试一笔正常下单. 访问 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10
. 订单购买10个产品, 共10元.
在没有出现异常的情况下, 各模块的数据结果正常. 订单库新增了一笔订单数据且订单状态已完成, 还剩下90个库存, 账户余额剩下990元.
订单service#create方法中不添加@GlobalTransactional
, 没有全局事务 ,然后在账户模块扣减余额的service#decrease方法中添加超时模拟.
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("------->account-service中扣减账户余额开始");
accountDao.decrease(userId, money);
// 模拟超时, 30s远远超过了远程调用的默认超时时间.
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("------->account-service中扣减账户余额结束");
}
再次访问http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10
因为扣减账户余额超时,导致交易失败.
我们再来查看db中的各模块的数据结果如何. 从下面的数据中可以看到, 订单库增加了一条新的订单,但是状态是未完成(因为扣减余额失败). 但是库存和账户已经成功扣减了,出现了数据不一致的问题. 需要通过分布式事务来协调各个分支模块的事务原子性.
在订单的service#create方法上添加注解@GlobalTransactional
, 来开启全局事务.
@Override @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class) public void create(Order order) { log.info("------->下单开始"); //本应用创建订单 orderDao.create(order); //远程调用库存服务扣减库存 log.info("------->order-service中扣减库存开始"); storageService.decrease(order.getProductId(), order.getCount()); log.info("------->order-service中扣减库存结束"); //远程调用账户服务扣减余额 log.info("------->order-service中扣减余额开始"); accountService.decrease(order.getUserId(), order.getMoney()); log.info("------->order-service中扣减余额结束"); //修改订单状态为已完成 log.info("------->order-service中修改订单状态开始"); orderDao.update(order.getProductId(), 0); log.info("------->order-service中修改订单状态结束"); log.info("------->下单结束"); }
重启后订单微服务应用,再次访问 http://localhost:2001//order/create?userId=1&productId=1&count=10&money=10
, 因为超时一样导致交易失败.
但是查看库表数据, 还是上次的结果, 没有产生新的订单, 也没有扣减库存和账户余额. 因为全局事务的协调, 当出现异常时, 事务发起者TM(订单模块)向事务协调者TC发送事务回滚的消息, 事务协调者TC再向各分支事务发起回滚通知, TM进行本地事务回滚.
下面, 我们通过debug断点调试来跟进这个事务回滚的过程.
断点位置:
现在, 我们来触发一次交易, 跳转到断点位置.
然后,我们来查看库表的数据及各自的状态. 根据断点停留的位置, 此时应该是创建了一个新的订单(状态未完成0), 库存(70)和账户余额(970)都被扣减了10. 但是此时的全局事务还没有提交.
因为全局事务还没有提交, 我们来看看seata-server的事务相关的表数据.
SELECT * FROM seata.global_table; 全局事务表中, 产生了一个全局事务XID, 事务发起者seata-order-service
, 事务群组是自定义配置的fsp_tx_group
SELECT * FROM seata.branch_table; 分支事务表中, 全局事务XID下面产生了三个分支事务, 事务涉及三个模块的资源. branch_type=AT表示seata的事务模式, 后面会讲.
SELECT * FROM seata.lock_table; 事务锁中, 可以看到三张业务表(t_order, t_storage, t_account)都被锁了.
我们再来看业务数据库下面的undo_log控制事务的表数据.
SELECT * FROM seata_order.undo_log;
SELECT * FROM seata_storage.undo_log; 全局XID 192.168.65.1:8091:2095810330 那条.
SELECT * FROM seata_account.undo_log;
上面三个业务数据库的undo_log都记录了rollback_info数据, 里面记录了db更新之前的快照和更新之后的数据, 方便在全局事务发起提交或回滚用. 一旦全局事务发起提交或回滚通知, 各分支事务会根据rollback_info的快照信息进行本地事务提交或回滚. 全局事务完成后, 删除undo_log日志数据.
// 账户表的快照信息 { "@class": "io.seata.rm.datasource.undo.BranchUndoLog", "xid": "192.168.65.1:8091:2095813945", // 全局事务XID "branchId": 2095813953, // 分支事务ID "sqlUndoLogs": [ "java.util.ArrayList", [ { "@class": "io.seata.rm.datasource.undo.SQLUndoLog", "sqlType": "UPDATE", "tableName": "t_account", "beforeImage": { // 前置快照 beforeImage "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_account", // 账户表 "rows": [ "java.util.ArrayList", [ { "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": [ "java.util.ArrayList", [ { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PrimaryKey", "type": -5, "value": [ "java.lang.Long", 1 ] }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "residue", // 余额字段 "keyType": "NULL", "type": 3, "value": [ "java.math.BigDecimal", 980 // before值980 ] }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "used", "keyType": "NULL", "type": 3, "value": [ "java.math.BigDecimal", 20 ] } ] ] } ] ] }, "afterImage": { // 后置快照 afterImage "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_account", "rows": [ "java.util.ArrayList", [ { "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": [ "java.util.ArrayList", [ { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PrimaryKey", "type": -5, "value": [ "java.lang.Long", 1 ] }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "residue", // 余额字段 "keyType": "NULL", "type": 3, "value": [ "java.math.BigDecimal", 970 // after值 970 ] }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "used", "keyType": "NULL", "type": 3, "value": [ "java.math.BigDecimal", 30 ] } ] ] } ] ] } } ] ] }
我们现在放开断点, 让程序走完流程. 发现交易失败, 数据发生了回滚, seata-server的全局事务表(global_table), 分支事务表(branch_table), 事务锁的表(lock_table), 以及每个业务数据库的undo_log日志数据都被删除.
再看TC/TM/RM/三大组件:
AT模式是如何做到对业务的无侵入, 有两个前提:
两阶段提交的整体机制:
一阶段提交
,Seata 会拦截“业务 SQL”,流程见图(一阶段提交)
业务 SQL
更新业务数据, 业务数据更新之后, 保存成“after image”,最后生成行锁(lock表)。二阶段提交
,因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。见下图(二阶段提交)二阶段回滚
,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。见下图(二阶段回滚)欢迎访问个人博客: https://www.crystalblog.xyz/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。