当前位置:   article > 正文

五阶段--分布式事务--Seata TCC模式-TCC模式/Spring Cloud微服务添加 TCC 分布式事务_微服务tcc

微服务tcc

目录

一 TCC 基本原理

 二 TCC 事务

1 创建工程,导入项目

2 添加TCC事务

2.1 订单order工程

2.2 库存storage工程

2.3 账户Account 工程


分布式事务(六)Seata TCC模式-TCC模式介绍

一 TCC 基本原理

TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:

  • TCC 对业务代码侵入严重
    每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
  • TCC 效率更高
    不必对数据加全局锁,允许多个事务并发同时操作数据。

a

 二 TCC 事务

1 创建工程,导入项目

第一步: 创建 empty project: seata-tcc  ,独立的project工程,要与seata-at工程区分开

在这里插入图片描述

第二步: "seata-tcc/无事务版本.zip",解压后,只能一个个导入,project Structure-->modules-->import module-->挨个找到七个工程导入

 或者通过右侧的Maven中的加号找到七个工程的pom文件导入

在 idea 中按两下 shift 键,搜索 add maven projects,打开 maven 工具:

a

然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:

a

2 添加TCC事务

2.1 订单order工程

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。

a

  • Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。

a

  • Cancel - 第二阶段,回滚事务,删除订单。

a

第一步: order-parent1父工程添加seata 依赖

  1. <!--分布式事务seata依赖-->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-alibaba-seata</artifactId>
  5. <version>${spring-cloud-alibaba-seata.version}</version>
  6. <exclusions>
  7. <exclusion>
  8. <artifactId>seata-all</artifactId>
  9. <groupId>io.seata</groupId>
  10. </exclusion>
  11. </exclusions>
  12. </dependency>
  13. <dependency>
  14. <groupId>io.seata</groupId>
  15. <artifactId>seata-all</artifactId>
  16. <version>${seata.version}</version>
  17. </dependency>

第二步:三个配置文件

  • application.yml -- 设置全局事务组的组名
  1. spring:
  2. application:
  3. name: order
  4. datasource:
  5. driver-class-name: com.mysql.cj.jdbc.Driver
  6. url: jdbc:mysql:///seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
  7. username: root
  8. password: root
  9. # 事务组设置
  10. cloud:
  11. alibaba:
  12. seata:
  13. tx-service-group: order_tx_group
  14. server:
  15. port: 8083
  16. eureka:
  17. client:
  18. service-url:
  19. defaultZone: http://localhost:8761/eureka
  20. instance:
  21. prefer-ip-address: true
  22. # instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
  23. mybatis-plus:
  24. type-aliases-package: cn.tedu.order.entity
  25. mapper-locations: classpath:mapper/*.xml
  26. configuration:
  27. map-underscore-to-camel-case: true
  28. logging:
  29. level:
  30. cn.tedu.order.mapper: DEBUG
  • registry.conf -- 注册中心地址
  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  3. type = "eureka"
  4. nacos {
  5. serverAddr = "localhost"
  6. namespace = ""
  7. cluster = "default"
  8. }
  9. eureka {
  10. serviceUrl = "http://localhost:8761/eureka"
  11. # application = "default"
  12. # weight = "1"
  13. }
  14. redis {
  15. serverAddr = "localhost:6379"
  16. db = "0"
  17. password = ""
  18. cluster = "default"
  19. timeout = "0"
  20. }
  21. zk {
  22. cluster = "default"
  23. serverAddr = "127.0.0.1:2181"
  24. session.timeout = 6000
  25. connect.timeout = 2000
  26. username = ""
  27. password = ""
  28. }
  29. consul {
  30. cluster = "default"
  31. serverAddr = "127.0.0.1:8500"
  32. }
  33. etcd3 {
  34. cluster = "default"
  35. serverAddr = "http://localhost:2379"
  36. }
  37. sofa {
  38. serverAddr = "127.0.0.1:9603"
  39. application = "default"
  40. region = "DEFAULT_ZONE"
  41. datacenter = "DefaultDataCenter"
  42. cluster = "default"
  43. group = "SEATA_GROUP"
  44. addressWaitTime = "3000"
  45. }
  46. file {
  47. name = "file.conf"
  48. }
  49. }
  50. config {
  51. # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  52. type = "file"
  53. nacos {
  54. serverAddr = "localhost"
  55. namespace = ""
  56. group = "SEATA_GROUP"
  57. }
  58. consul {
  59. serverAddr = "127.0.0.1:8500"
  60. }
  61. apollo {
  62. app.id = "seata-server"
  63. apollo.meta = "http://192.168.1.204:8801"
  64. namespace = "application"
  65. }
  66. zk {
  67. serverAddr = "127.0.0.1:2181"
  68. session.timeout = 6000
  69. connect.timeout = 2000
  70. username = ""
  71. password = ""
  72. }
  73. etcd3 {
  74. serverAddr = "http://localhost:2379"
  75. }
  76. file {
  77. name = "file.conf"
  78. }
  79. }
  • file.conf -- 事务组对应使用的事务协调器,因为可能使用多个事务协调器,必须一一对应
  1. transport {
  2. # tcp udt unix-domain-socket
  3. type = "TCP"
  4. #NIO NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size,will not be used for UDT
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
  34. # “seata-server” 与 TC 服务器的注册名一致
  35. # 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
  36. vgroupMapping.order_tx_group = "seata-server"
  37. #only support when registry.type=file, please don't set multiple addresses
  38. order_tx_group.grouplist = "127.0.0.1:8091"
  39. #degrade, current not support
  40. enableDegrade = false
  41. #disable seata
  42. disableGlobalTransaction = false
  43. }
  44. client {
  45. rm {
  46. asyncCommitBufferLimit = 10000
  47. lock {
  48. retryInterval = 10
  49. retryTimes = 30
  50. retryPolicyBranchRollbackOnConflict = true
  51. }
  52. reportRetryCount = 5
  53. tableMetaCheckEnable = false
  54. reportSuccessEnable = false
  55. }
  56. tm {
  57. commitRetryCount = 5
  58. rollbackRetryCount = 5
  59. }
  60. undo {
  61. dataValidation = true
  62. logSerialization = "jackson"
  63. logTable = "undo_log"
  64. }
  65. log {
  66. exceptionRate = 100
  67. }
  68. }

第三步: 修改Mapper , 添加 TCC 三个数据库操作

  1. package cn.tedu.order.mapper;
  2. import cn.tedu.order.entity.Order;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. public interface OrderMapper extends BaseMapper<Order> {
  5. void create(Order order);
  6. void insertFrozen(Order order); //插入冻结状态订单
  7. void updateStatus(Long orderId, Integer status); //修改状态
  8. // 删除方法 deleteById(orderId) 从通用Mapper继承
  9. }
  • 修改resource包下的mapper包的OrderMapper.xml文件,添加上面三个方法的sql语句
  1. <insert id="insertFrozen">
  2. INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
  3. VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},#{status});
  4. </insert>
  5. <update id="updateStatus">
  6. update `order` set status=#{status} where id=#{orderId}
  7. </update>
  8. <delete id="deleteById">
  9. delete from `order` where id=#{orderId}
  10. </delete>

第四步:  第一阶段Try成功后,第二阶段为了处理幂等性问题这里首先添加一个工具类 ResultHolder

  • 这个工具也可以在第二阶段 Confirm 或 Cancel 阶段对第一阶段的成功与否进行判断,在第一阶段成功时需要保存一个标识。
  • ResultHolder可以为每一个全局事务保存一个标识:复制粘贴使用即可
  1. package cn.tedu.order.tcc;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. public class ResultHolder {
  5. private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
  6. public static void setResult(Class<?> actionClass, String xid, String v) {
  7. Map<String, String> results = map.get(actionClass);
  8. if (results == null) {
  9. synchronized (map) {
  10. if (results == null) {
  11. results = new ConcurrentHashMap<>();
  12. map.put(actionClass, results);
  13. }
  14. }
  15. }
  16. results.put(xid, v);
  17. }
  18. public static String getResult(Class<?> actionClass, String xid) {
  19. Map<String, String> results = map.get(actionClass);
  20. if (results != null) {
  21. return results.get(xid);
  22. }
  23. return null;
  24. }
  25. public static void removeResult(Class<?> actionClass, String xid) {
  26. Map<String, String> results = map.get(actionClass);
  27. if (results != null) {
  28. results.remove(xid);
  29. }
  30. }
  31. }

 第五步:按照 seata tcc 的规则,定义 TccAcction 接口和实现类

order工程创建tcc包:

  1. package cn.tedu.order.tcc;
  2. import cn.tedu.order.entity.Order;
  3. import io.seata.rm.tcc.api.BusinessActionContext;
  4. import io.seata.rm.tcc.api.BusinessActionContextParameter;
  5. import io.seata.rm.tcc.api.LocalTCC;
  6. import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
  7. import java.math.BigDecimal;
  8. /**
  9. * 按照 Seata 规则,实现 TccAction 接口
  10. * 1. 添加 @LocalTcc 注解
  11. * 2. 添加三个方法-- TCC 方法
  12. * 3. 第一个方法上添加"两阶段业务操作"注解,
  13. * 并指定后面两个方法的方法名
  14. * 4. 三个方法上添加 BusinessActionContext 上下文对象作为参数,
  15. * 用来从第一阶段向第二阶段传递参数
  16. * 5. 传递的参数数据,用 @BusinessActionContextParameter 注解放入上下文对象
  17. * */
  18. @LocalTCC
  19. public interface OrderTccAction {
  20. /**
  21. * 避开seata的bug,订单数据一个一个单独传递,而不用封装的Order 对象
  22. * */
  23. //表示这些是第一阶段方法,默认方法名,就不用配置提交方法和回滚方法
  24. @TwoPhaseBusinessAction(name =" OrderTccAction")
  25. boolean prepare(BusinessActionContext ctx,
  26. @BusinessActionContextParameter(paramName = "orderId") Long orderId,
  27. Long userId,
  28. Long productId,
  29. Integer count,
  30. BigDecimal money);
  31. boolean commit(BusinessActionContext ctx);
  32. boolean rollback(BusinessActionContext ctx);
  33. }

实现类:

  1. package cn.tedu.order.tcc;
  2. import cn.tedu.order.entity.Order;
  3. import cn.tedu.order.mapper.OrderMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. import java.math.BigDecimal;
  9. @Component
  10. public class OrderTccActionImpl implements OrderTccAction {
  11. @Autowired
  12. private OrderMapper orderMapper;
  13. /*一阶段*/
  14. @Transactional
  15. @Override
  16. public boolean prepare(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
  17. // 冻结订单
  18. orderMapper.insertFrozen(new Order(orderId,userId,productId,count,money,0));
  19. // 保存一阶段的成功标记--两个键一个标记
  20. ResultHolder.setResult(OrderTccAction.class,ctx.getXid(),"p");
  21. return true;
  22. }
  23. /*二阶段*/
  24. @Transactional
  25. @Override
  26. //加同步锁,等待删除成功后才能继续执行其他的
  27. public synchronized boolean commit(BusinessActionContext ctx) {
  28. // 判断标记p是否存在--标记不存在,表示二阶段要么执行过,要么一阶段失败,二阶段不再执行
  29. if (ResultHolder.getResult(OrderTccAction.class,ctx.getXid()) == null){
  30. return true;//没有标记到此结束
  31. }
  32. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  33. orderMapper.updateStatus(orderId, 1);
  34. // 二阶段执行完成,删除键标记
  35. ResultHolder.removeResult(OrderTccAction.class,ctx.getXid());
  36. return true;
  37. }
  38. /*二阶段*/
  39. @Transactional
  40. @Override
  41. //加同步锁,等待删除成功后才能继续执行其他的
  42. public synchronized boolean rollback(BusinessActionContext ctx) {
  43. // 判断标记p是否存在--标记不存在,表示二阶段要么执行过,要么一阶段失败,二阶段不再执行
  44. if (ResultHolder.getResult(OrderTccAction.class,ctx.getXid()) == null){
  45. return true;//没有标记到此结束
  46. }
  47. Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
  48. orderMapper.deleteById(orderId);
  49. // 二阶段执行完成,删除键标记
  50. ResultHolder.removeResult(OrderTccAction.class,ctx.getXid());
  51. return true;
  52. }
  53. }

第六步:修改业务方法,调用 TccAcction 的第一阶段方法(Try--预留数据),在第一个模块上添加 @GlobalTransactional 启动全局事务

  1. package cn.tedu.order.service;
  2. import cn.tedu.order.entity.Order;
  3. import cn.tedu.order.feign.AccountClient;
  4. import cn.tedu.order.feign.EasyIdClient;
  5. import cn.tedu.order.feign.StorageClient;
  6. import cn.tedu.order.mapper.OrderMapper;
  7. import cn.tedu.order.tcc.OrderTccAction;
  8. import io.seata.spring.annotation.GlobalTransactional;
  9. import org.checkerframework.checker.units.qual.A;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Service;
  12. import java.util.Random;
  13. @Service
  14. public class OrderServiceImpl implements OrderService {
  15. @Autowired
  16. private EasyIdClient easyIdClient;
  17. @Autowired
  18. private StorageClient storageClient;
  19. @Autowired
  20. private AccountClient accountClient;
  21. @Autowired
  22. private OrderTccAction tcc;
  23. @GlobalTransactional //启动全局事务
  24. @Override
  25. public void create(Order order) {
  26. // 远程调用发号器,获取订单id
  27. String s = easyIdClient.nextId("order_business");
  28. Long orderId = Long.valueOf(s);
  29. order.setId(orderId);
  30. // orderMapper.create(order);
  31. /*第一个参数上下文对象,在tcc的动态代理对象中,通过 AOP 添加了前置通知,
  32. * 在前置代码中回创建上下文对象
  33. * */
  34. //不直接正常的创建订单,而是调用TccAction一阶段方法,冻结订单
  35. tcc.prepare(null,
  36. order.getId(),
  37. order.getUserId(),
  38. order.getProductId(),
  39. order.getCount(),
  40. order.getMoney());
  41. // 远程调用storage库存,减少库存
  42. //storageClient.decrease(order.getProductId(),order.getCount());
  43. // 远程调用account账户,减少账户余额
  44. //accountClient.decrease(order.getUserId(),order.getMoney());
  45. }
  46. }

第七步:启动 order 进行测试

按顺序启动服务:

Eureka
Seata Server
Easy Id Generator
Order
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

观察order工程控制台日志:
a

 查看数据库表中的订单数据:

a

2.2 库存storage工程

我们要添加以下 TCC 事务操作的代码:

  • Try - 第一阶,冻结数据阶段,将要减少的库存量先冻结:

a

  • Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:

a

  • Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:

a

第一步:有三个文件需要配置:

  • application.yml
  1. spring:
  2. application:
  3. name: order
  4. datasource:
  5. driver-class-name: com.mysql.cj.jdbc.Driver
  6. url: jdbc:mysql:///seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
  7. username: root
  8. password: root
  9. # 事务组设置
  10. cloud:
  11. alibaba:
  12. seata:
  13. tx-service-group: order_tx_group
  14. server:
  15. port: 8083
  16. eureka:
  17. client:
  18. service-url:
  19. defaultZone: http://localhost:8761/eureka
  20. instance:
  21. prefer-ip-address: true
  22. # instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
  23. mybatis-plus:
  24. type-aliases-package: cn.tedu.order.entity
  25. mapper-locations: classpath:mapper/*.xml
  26. configuration:
  27. map-underscore-to-camel-case: true
  28. logging:
  29. level:
  30. cn.tedu.order.mapper: DEBUG
  • registry.conf
  • file.conf

这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。

第二步: 工具类 ResultHolder

  1. package cn.tedu.storage.tcc;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. public class ResultHolder {
  5. private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
  6. public static void setResult(Class<?> actionClass, String xid, String v) {
  7. Map<String, String> results = map.get(actionClass);
  8. if (results == null) {
  9. synchronized (map) {
  10. if (results == null) {
  11. results = new ConcurrentHashMap<>();
  12. map.put(actionClass, results);
  13. }
  14. }
  15. }
  16. results.put(xid, v);
  17. }
  18. public static String getResult(Class<?> actionClass, String xid) {
  19. Map<String, String> results = map.get(actionClass);
  20. if (results != null) {
  21. return results.get(xid);
  22. }
  23. return null;
  24. }
  25. public static void removeResult(Class<?> actionClass, String xid) {
  26. Map<String, String> results = map.get(actionClass);
  27. if (results != null) {
  28. results.remove(xid);
  29. }
  30. }
  31. }

第三步:StorageMapper接口 添加冻结库存相关方法 以及实现类的方法重写

根据前面的分析,库存数据操作有以下三项:

  • 冻结库存
  • 冻结库存量修改为已售出量
  • 解冻库存
  1. package cn.tedu.storage.mapper;
  2. import cn.tedu.storage.entity.Storage;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. public interface StorageMapper extends BaseMapper<Storage> {
  5. void decrease(Long productId,Integer count);
  6. Storage selectByProductId(Long productId);
  7. void updateResidueToFrozen(Long productId,Integer count);//可用-->冻结
  8. void updateFrozenToUsed(Long productId,Integer count);//冻结-->已使用
  9. void updateFrozenToResidue(Long productId,Integer count);//冻结-->可用
  10. }
  1. package cn.tedu.storage.tcc;
  2. import cn.tedu.storage.entity.Storage;
  3. import cn.tedu.storage.mapper.StorageMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. @Component
  9. public class StorageTccActionImpl implements StorageTccAction {
  10. @Autowired
  11. private StorageMapper storageMapper;
  12. /*一阶段*/
  13. @Transactional
  14. @Override
  15. public boolean prepare(BusinessActionContext ctx, Long productId, Integer count) {
  16. Storage storage =
  17. storageMapper.selectByProductId(productId);
  18. if (storage.getResidue() < count) {
  19. throw new RuntimeException("库存不足");
  20. }
  21. storageMapper.updateResidueToFrozen(productId,count);
  22. // 保存一阶段的成功标记--两个键一个标记
  23. ResultHolder.setResult(StorageTccAction.class,ctx.getXid(),"p");
  24. return true;
  25. }
  26. /*二阶段*/
  27. @Transactional
  28. @Override
  29. //加同步锁,等待删除成功后才能继续执行其他的
  30. public synchronized boolean commit(BusinessActionContext ctx) {
  31. // 判断标记p是否存在--标记不存在,表示二阶段要么执行过,要么一阶段失败,二阶段不再执行
  32. if (ResultHolder.getResult(StorageTccAction.class,ctx.getXid()) == null){
  33. return true;//没有标记到此结束
  34. }
  35. Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
  36. Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
  37. storageMapper.updateFrozenToUsed(productId,count);
  38. // 二阶段执行完成,删除键标记
  39. ResultHolder.removeResult(StorageTccAction.class,ctx.getXid());
  40. return true;
  41. }
  42. /*二阶段*/
  43. @Transactional
  44. @Override
  45. //加同步锁,等待删除成功后才能继续执行其他的
  46. public synchronized boolean rollback(BusinessActionContext ctx) {
  47. // 判断标记p是否存在--标记不存在,表示二阶段要么执行过,要么一阶段失败,二阶段不再执行
  48. if (ResultHolder.getResult(StorageTccAction.class,ctx.getXid()) == null){
  49. return true;//没有标记到此结束
  50. }
  51. Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
  52. Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
  53. storageMapper.updateFrozenToResidue(productId,count);
  54. // 二阶段执行完成,删除键标记
  55. ResultHolder.removeResult(StorageTccAction.class,ctx.getXid());
  56. return true;
  57. }
  58. }

添加本地事务管理:

第四步:在StorageMapper对应的xml核心配置文件中编辑这三个方法的SQL语句

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
  4. <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
  5. <id column="id" property="id" jdbcType="BIGINT" />
  6. <result column="product_id" property="productId" jdbcType="BIGINT" />
  7. <result column="total" property="total" jdbcType="INTEGER" />
  8. <result column="used" property="used" jdbcType="INTEGER" />
  9. <result column="residue" property="residue" jdbcType="INTEGER" />
  10. </resultMap>
  11. <update id="decrease">
  12. UPDATE storage SET used = used+#{count},residue=residue-#{count} WHERE product_id = #{productId}
  13. </update>
  14. <select id="selectByProductId" resultMap="BaseResultMap">
  15. select * from storage where product_id = #{productId}
  16. </select>
  17. <update id="updateResidueToFrozen">
  18. update storage
  19. set Residue=Residue-#{count},Frozen=Frozen+#{count}
  20. where product_id = #{productId}
  21. </update>
  22. <update id="updateFrozenToUsed">
  23. update storage
  24. set Frozen=Frozen - #{count},Used = Used + #{count}
  25. where product_id = #{productId}
  26. </update>
  27. <update id="updateFrozenToResidue">
  28. update storage
  29. set Frozen=Frozen - #{count},Residue=Residue + #{count}
  30. where product_id = #{productId}
  31. </update>
  32. </mapper>

第五步: 库存接口实现类调用 Try 阶段方法,业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional

  1. package cn.tedu.storage.service;
  2. import cn.tedu.storage.mapper.StorageMapper;
  3. import cn.tedu.storage.tcc.StorageTccAction;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class StorageServiceImpl implements StorageService {
  8. @Autowired
  9. private StorageTccAction tcc;
  10. //@GlobalTransactional //此时可以不用添加
  11. @Override
  12. public void decrease(Long productId, Integer count) {
  13. tcc.prepare(null,productId,count);
  14. }
  15. }
第六步:order工程服务层实现类打开注释掉的远程调用库存的方法--OrderServiceimpl
第七步:启动storage 测试
按顺序启动服务:Eureka-->Seata Server-->Easy Id Generator-->Storage-->Order
查看localhost:8761  eureka服务,查看服务是否注册成功


调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

  网页返回创建订单成功,库存没有时返回500

  • 观察 storage 的控制台日志:

a

  • 查看数据库表中的库存数据:

a

  • 反复的提交订单,库存同时减少,知道库存不足可看到下面效果:

  •  注意:

2.3 账户Account 工程

第一步: 添加seata依赖,此处因为order-parent父工程添加了,此处可以不再添加

第二步:三个配置,可以从上面两个工程中复制粘贴

  • application.yml
  • file.conf
  • registry.conf

第三步:mapper层接口添加调用方法

  1. package cn.tedu.account.mapper;
  2. import cn.tedu.account.entity.Account;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. import java.math.BigDecimal;
  5. public interface AccountMapper extends BaseMapper<Account> {
  6. void decrease(Long userId, BigDecimal money);
  7. Account selectByUserId(Long userId);
  8. void updateResidueToFrozen(Long userId,BigDecimal money);//可用-->冻结
  9. void updateFrozenToUsed(Long userId,BigDecimal money);//冻结-->已使用
  10. void updateFrozenToResidue(Long userId,BigDecimal money);//冻结-->可用
  11. }

第四步: resources包下mapper包的accountmapper.xml编辑SQL语句

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="cn.tedu.account.mapper.AccountMapper" >
  4. <resultMap id="BaseResultMap" type="Account" >
  5. <id column="id" property="id" jdbcType="BIGINT" />
  6. <result column="user_id" property="userId" jdbcType="BIGINT" />
  7. <result column="total" property="total" jdbcType="DECIMAL" />
  8. <result column="used" property="used" jdbcType="DECIMAL" />
  9. <result column="residue" property="residue" jdbcType="DECIMAL"/>
  10. <result column="frozen" property="frozen" jdbcType="DECIMAL"/>
  11. </resultMap>
  12. <update id="decrease">
  13. UPDATE account
  14. SET residue = residue - #{money},used = used + #{money}
  15. where user_id = #{userId};
  16. </update>
  17. <select id="selectByUserId" resultMap="BaseResultMap">
  18. select * from account where user_id = #{userId};
  19. </select>
  20. <update id="updateResidueToFrozen">
  21. update account
  22. set Residue=Residue-#{money}, Frozen=Frozen+#{money}
  23. where user_id = #{userId};
  24. </update>
  25. <update id="updateFrozenToUsed">
  26. update account
  27. set Frozen=Frozen-#{money}, Used=Used+#{money}
  28. where user_id = #{userId};
  29. </update>
  30. <update id="updateFrozenToResidue">
  31. update account
  32. set Frozen=Frozen-#{money}, Residue=Residue+#{money}
  33. where user_id = #{userId};
  34. </update>
  35. </mapper>

第五步: 创建tcc包,添加标识工具类ResultHolder

  1. package cn.tedu.account.tcc;
  2. import java.util.Map;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. public class ResultHolder {
  5. private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();
  6. public static void setResult(Class<?> actionClass, String xid, String v) {
  7. Map<String, String> results = map.get(actionClass);
  8. if (results == null) {
  9. synchronized (map) {
  10. if (results == null) {
  11. results = new ConcurrentHashMap<>();
  12. map.put(actionClass, results);
  13. }
  14. }
  15. }
  16. results.put(xid, v);
  17. }
  18. public static String getResult(Class<?> actionClass, String xid) {
  19. Map<String, String> results = map.get(actionClass);
  20. if (results != null) {
  21. return results.get(xid);
  22. }
  23. return null;
  24. }
  25. public static void removeResult(Class<?> actionClass, String xid) {
  26. Map<String, String> results = map.get(actionClass);
  27. if (results != null) {
  28. results.remove(xid);
  29. }
  30. }
  31. }

第六步: 按照 seata tcc 的规则,定义 TccAcction 接口和实现类

  1. package cn.tedu.account.tcc;
  2. import io.seata.rm.tcc.api.BusinessActionContext;
  3. import io.seata.rm.tcc.api.BusinessActionContextParameter;
  4. import io.seata.rm.tcc.api.LocalTCC;
  5. import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
  6. import java.math.BigDecimal;
  7. @LocalTCC
  8. public interface AccountTccAction {
  9. @TwoPhaseBusinessAction(name = "AccountTccAction")
  10. boolean prepare(BusinessActionContext ctx ,
  11. @BusinessActionContextParameter(paramName = "userId") Long userId,
  12. @BusinessActionContextParameter(paramName = "money")BigDecimal money);
  13. boolean commit(BusinessActionContext ctx);
  14. boolean rollback(BusinessActionContext ctx);
  15. }
  • 实现类:
  1. package cn.tedu.account.tcc;
  2. import cn.tedu.account.entity.Account;
  3. import cn.tedu.account.mapper.AccountMapper;
  4. import io.seata.rm.tcc.api.BusinessActionContext;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. import java.math.BigDecimal;
  9. @Component
  10. public class AccountTccActionImpl implements AccountTccAction {
  11. @Autowired
  12. private AccountMapper accountMapper;
  13. /*一阶段*/
  14. @Transactional
  15. @Override
  16. public boolean prepare(BusinessActionContext ctx, Long userId, BigDecimal money) {
  17. Account account = accountMapper.selectByUserId(userId);
  18. //判断可用金额
  19. if (account.getResidue().compareTo(money) < 0) {
  20. throw new RuntimeException("可用金额已不足");
  21. }
  22. accountMapper.updateResidueToFrozen(userId,money);
  23. //标记
  24. ResultHolder.setResult(AccountTccAction.class,ctx.getXid(),"p");
  25. return true;
  26. }
  27. /*二阶段*/
  28. @Transactional
  29. @Override
  30. public synchronized boolean commit(BusinessActionContext ctx) {
  31. if (ResultHolder.getResult(AccountTccAction.class,ctx.getXid()) == null){
  32. return true;
  33. }
  34. Long userId = Long.valueOf(ctx.getActionContext("userId").toString());
  35. BigDecimal money = new BigDecimal(ctx.getActionContext("money").toString());
  36. accountMapper.updateFrozenToUsed(userId,money);
  37. ResultHolder.removeResult(AccountTccAction.class,ctx.getXid());
  38. return true;
  39. }
  40. /*二阶段*/
  41. @Transactional
  42. @Override
  43. public synchronized boolean rollback(BusinessActionContext ctx) {
  44. if (ResultHolder.getResult(AccountTccAction.class,ctx.getXid()) == null){
  45. return true;
  46. }
  47. Long userId = Long.valueOf(ctx.getActionContext("userId").toString());
  48. BigDecimal money = new BigDecimal(ctx.getActionContext("money").toString());
  49. accountMapper.updateFrozenToResidue(userId,money);
  50. ResultHolder.removeResult(AccountTccAction.class,ctx.getXid());
  51. return true;
  52. }
  53. }

第七步:  账户接口实现类调用 Try 阶段方法,业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional

  1. package cn.tedu.account.service;
  2. import cn.tedu.account.mapper.AccountMapper;
  3. import cn.tedu.account.tcc.AccountTccAction;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import java.math.BigDecimal;
  7. @Service
  8. public class AccountServiceImpl implements AccountService {
  9. @Autowired
  10. private AccountTccAction tcc;
  11. @Override
  12. public void decrease(Long userId, BigDecimal money) {
  13. tcc.prepare(null,userId,money);
  14. }
  15. }

第八步:放开orderServiceImpl实现类的账户远程调用的方法

 第九步:启动 account 进行测试

  • 按顺序启动服务:

Eureka-->Seata Server-->Easy Id Generator-->Storage-->Account-->Order

  • 调用保存订单,地址:

http://localhost:8083/create?userId=1&productId=1&count=10&money=100

  • 观察 account 的控制台日志:

    a

  • 查看数据库表中的账户数据:

a

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/1011468
推荐阅读
相关标签
  

闽ICP备14008679号