赞
踩
TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:
TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
分支事务失败的情况:
以下数据采集日为2019-7-11,目前市面上的TCC框架众多比如下面这几种:
框架名称 | star数量 |
---|---|
tcc-transaction | 3850 |
Hmily | 2407 |
ByteTCC | 1947 |
EasyTransaction | 1690 |
上一节所讲的Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC的原理以及事务协调运作的过程,因此更请倾向于轻量级易于理解的框架,因此最终确定了Hmily。
Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等 RPC框架进行分布式事务。它目前支持以下特性:
Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的 调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。
Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存 储。
Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务 逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架 发现即可,不需要被调用它的其他业务服务所感知。
TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:
举例,场景为 A 转账 30 元给 B,A和B账户在不同的服务。 方案1:
账户A
账户B
说明:
1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此,我们在扣钱TCC资源的Try接口里先检查A账户余额是否足够,如果足够则扣除30元。Confirm接口表示正式提交,由于业务资源已经在Try接口里扣除掉了,那么在第二阶段的Confirm接口里可以什么都不用做。Cancel接口的执行表示整个事务回滚,账户A回滚则需要把Try接口里扣除掉的30元还给账户。
2)账号B,在第一阶段Try接口里实现给账户B加钱,Cancel接口的执行表示整个事务回滚,账户B回滚则需要把Try接口里加的30元再减去。
分析:
1)如果账户A的try没有执行在cancel则就多加了30元。
2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。
3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。
4)如果账户B的try没有执行在cancel则就多减了30元。
解决:
1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。
2)try,cancel、confirm方法实现幂等。
3)账号B在try方法中不允许更新账户金额,在confirm中更新账户金额。
4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。
优化方案:
账户A
账户B
本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。
数据库:MySQL-5.7.25
JDK:64位 jdk1.8.0_201
微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
Hmily:hmily-springcloud.2.0.4-RELEASE
微服务及数据库的关系 :
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1
dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2
服务注册中心:dtx/discover-server
导入数据库脚本:资料\sql\bank1.sql、资料\sql\bank2.sql、已经导过不用重复导入。 创建hmily数据库,用于存储hmily框架记录的数据。
CREATE DATABASE `hmily` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
创建bank1库,并导入以下表结构和数据(包含张三账户)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码',
`account_balance` double DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'张三','1',NULL,1000);
创建bank2库,并导入以下表结构和数据(包含李四账户)
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的账户','2',NULL,0);
每个数据库都创建try、confirm、cancel三张日志表:
DROP TABLE IF EXISTS `local_cancel_log`; CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `local_confirm_log`; CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `local_trade_log`; CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
discover-server是服务注册中心,测试工程将自己注册至discover-server。
导入:资料\基础代码\dtx 父工程,此工程自带了discover-server,discover-server基于Eureka实现。已经导过不用重复导入。
dtx-tcc-demo是tcc的测试工程,根据业务需求需要创建两个dtx-tcc-demo工程。
(1)导入dtx-tcc-demo
导入:资料\基础代码\dtx-tcc-demo到父工程dtx下。 两个测试工程如下:
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1银行1,操作张三账户,连接数据库bank1
dtx/dtx-tcc-demo/dtx-tcc-demo-bank2银行2,操作李四账户,连接数据库bank2
(2)引入maven依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-springcloud</artifactId>
<version>2.0.4-RELEASE</version>
</dependency>
(3)配置hmily
application.yml:
org:
dromara:
hmily :
serializer : kryo
recoverDelayTime : 30
retryMax : 30
scheduledDelay : 30
scheduledThreadMax : 10
repositorySupport : db
started: true
hmilyDbConfig :
driverClassName : com.mysql.jdbc.Driver
url : jdbc:mysql://localhost:3306/hmily?useUnicode=true
username : root
password : mysql
新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:
@Bean public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer")); hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap; }
启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:
@SpringBootApplication
@EnableDiscoveryClient
@EnableHystrix
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"})
@ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"})
public class Bank1TccServer {
public static void main(String[] args) {
SpringApplication.run(Bank1TccServer.class, args);
}
}
dtx-tcc-demo-bank1实现try和cancel方法,如下:
1)Dao
@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ") int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo); @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo); /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); }
2)try和cancel方法
package cn.itcast.dtx.tccdemo.bank1.service.impl; import cn.itcast.dtx.tccdemo.bank1.dao.AccountInfoDao; import cn.itcast.dtx.tccdemo.bank1.service.AccountInfoService; import cn.itcast.dtx.tccdemo.bank1.spring.Bank2Client; import lombok.extern.slf4j.Slf4j; import org.dromara.hmily.annotation.Hmily; import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author Administrator * @version 1.0 **/ @Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Autowired Bank2Client bank2Client; // 账户扣款,就是tcc的try方法 /** * try幂等校验 * try悬挂处理 * 检查余额是够扣减金额 * 扣减金额 * @param accountNo * @param amount */ @Override @Transactional //只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字 @Hmily(confirmMethod="commit",cancelMethod="rollback") public void updateAccountBalance(String accountNo, Double amount) { //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank1 try begin 开始执行...xid:{}",transId); //幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行 if(accountInfoDao.isExistTry(transId)>0){ log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId); return ; } //try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行 if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){ log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}",transId); return ; } //扣减金额 if(accountInfoDao.subtractAccountBalance(accountNo, amount)<=0){ //扣减失败 throw new RuntimeException("bank1 try 扣减金额失败,xid:{}"+transId); } //插入try执行记录,用于幂等判断 accountInfoDao.addTry(transId); //远程调用李四,转账 if(!bank2Client.transfer(amount)){ throw new RuntimeException("bank1 远程调用李四微服务失败,xid:{}"+transId); } if(amount == 2){ throw new RuntimeException("人为制造异常,xid:{}"+transId); } log.info("bank1 try end 结束执行...xid:{}",transId); } //confirm方法 @Transactional public void commit(String accountNo, Double amount){ //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}",transId,accountNo,amount); } /** cancel方法 * cancel幂等校验 * cancel空回滚处理 * 增加可用余额 * @param accountNo * @param amount */ @Transactional public void rollback(String accountNo, Double amount){ //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank1 cancel begin 开始执行...xid:{}",transId); // cancel幂等校验 if(accountInfoDao.isExistCancel(transId)>0){ log.info("bank1 cancel 已经执行,无需重复执行,xid:{}",transId); return ; } //cancel空回滚处理,如果try没有执行,cancel不允许执行 if(accountInfoDao.isExistTry(transId)<=0){ log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}",transId); return ; } // 增加可用余额 accountInfoDao.addAccountBalance(accountNo,amount); //插入一条cancel的执行记录 accountInfoDao.addCancel(transId); log.info("bank1 cancel end 结束执行...xid:{}",transId); } }
3)feignClient
/**
* Created by Administrator.
*/
@FeignClient(value="tcc-demo-bank2",fallback=Bank2ClientFallback.class)
public interface Bank2Client {
//远程调用李四的微服务
@GetMapping("/bank2/transfer")
@Hmily
public Boolean transfer(@RequestParam("amount") Double amount);
}
@RestController
public class Bank1Controller {
@Autowired
AccountInfoService accountInfoService;
@RequestMapping("/transfer")
public Boolean transfer(@RequestParam("amount") Double amount) {
this.accountInfoService.updateAccountBalance("1", amount);
return true;
}
}
dtx-tcc-demo-bank2实现如下功能:
1)Dao
package cn.itcast.dtx.tccdemo.bank2.dao; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Component; @Component @Mapper public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); /** * 增加某分支事务try执行记录 * @param localTradeNo 本地事务编号 * @return */ @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo); @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo); @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo); /** * 查询分支事务try是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); /** * 查询分支事务confirm是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); /** * 查询分支事务cancel是否已执行 * @param localTradeNo 本地事务编号 * @return */ @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); }
2)实现confifirm方法
package cn.itcast.dtx.tccdemo.bank2.service.impl; import cn.itcast.dtx.tccdemo.bank2.dao.AccountInfoDao; import cn.itcast.dtx.tccdemo.bank2.service.AccountInfoService; import lombok.extern.slf4j.Slf4j; import org.dromara.hmily.annotation.Hmily; import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author Administrator * @version 1.0 **/ @Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Override @Hmily(confirmMethod="confirmMethod", cancelMethod="cancelMethod") public void updateAccountBalance(String accountNo, Double amount) { //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank2 try begin 开始执行...xid:{}",transId); } /** * confirm方法 * confirm幂等校验 * 正式增加金额 * @param accountNo * @param amount */ @Transactional public void confirmMethod(String accountNo, Double amount){ //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank2 confirm begin 开始执行...xid:{}",transId); if(accountInfoDao.isExistConfirm(transId)>0){ log.info("bank2 confirm 已经执行,无需重复执行...xid:{}",transId); return ; } //增加金额 accountInfoDao.addAccountBalance(accountNo,amount); //增加一条confirm日志,用于幂等 accountInfoDao.addConfirm(transId); log.info("bank2 confirm end 结束执行...xid:{}",transId); } /** * @param accountNo * @param amount */ public void cancelMethod(String accountNo, Double amount){ //获取全局事务id String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("bank2 cancel begin 开始执行...xid:{}",transId); } }
3)Controller
package cn.itcast.dtx.tccdemo.bank2.controller; import cn.itcast.dtx.tccdemo.bank2.service.AccountInfoService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @author Administrator * @version 1.0 **/ @RestController public class Bank2Controller { @Autowired AccountInfoService accountInfoService; @RequestMapping("/transfer") public Boolean transfer(@RequestParam("amount") Double amount) { this.accountInfoService.updateAccountBalance("2", amount); return true; } }
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。