赞
踩
目录
目前在用户登录后调用初始化简历,其实这个过程是发生在两个不同的服务,本质上是在两个不同的分布式节点下进行的,这样的场景如果一旦简历服务发生异常,那么各自的事务是无法回滚的。(可以尝试在简历初始化的时候模拟一个除零异常)
举例电商中的场景经典的分布式事务使用场景:
三个链路,最后一个链路失败,之前的所有链路的事务是无法回滚的。因为每个服务自己的事务已经结束并且提交了,那么不同的节点是无法控制之前节点的事务的,所以事务是无法跨服务、无法跨分布式的,对于这样的事务,我们称之为分布式事务。
所以说,用户的一次请求会由多个不同的系统来协同完成,而请求的一次事务是涉及到了多个系统,这多个系统是分布式部署的,我们称之为分布式事务。
思考一下:如果数据库是多个,上述操作必定存在分布式事务。那么如果仅仅只有一个数据库,那么也会发生分布式事务吗?
回顾一下本地事务。 事务浅白点讲就是一连串的动作可以组成一个工作单元,这个工作单元具有如下几个特征:
本地事务是我们在进行单体应用架构时使用最多的一种数据库事务模式,由数据库来提供事务的支持,因为只有一个数据库,用户操作都是在一个工作单元中进行的,所以这也称之为本地事务。而且我们也都会借助数据库来完成事务的控制。
我们之前其实讲过了CAP定理,那么其实还有一个叫做BASE理论的玩意,这是对CAP的拓展。我们说过,现在主流的互联网项目,所采用的模式都是AP模式
,也就是可用性和分区容错性,而一致性呢,我们可以采取一定的手段,让他来达到最终一致性。那么BASE理论呢,其实就是针对一致性来说的。如下:
基本可用 Basically Available:分布式系统在出现故障出现异常的时候,可以允许损失一部分可用性,但是核心功能还是可以提供服务的。就像反浩克装甲那样,有些部件损坏了,但是机甲本身还是可用的,还可以正常打怪兽的。
软状态 Soft State:这是分布式系统中允许存在的中间状态,这个中间状态不会影响整个系统的可用性。什么意思呢,一个数据,在不同的系统中,他可能存在多个副本,也就是不一致性,这个不一致性是可以被允许的,分布式节点在数据同步的过程其实就是不一致的。阿里的一些系统,也是这么规定允许的,哪怕数据存在不一致性,也必须保证数据库以及整个网站的可用性。因为不一致性是中间状态,会被修改,哪怕是bug,也可以被修复,而数据库或者系统一旦出现问题导致不可用,那么损失的就是用户以及钱。
最终一致性 Enentual Consistency:这个是我们一直说的,我们并不要求数据在同一个时刻同一个工作单元内一致性,而是过一段时间,最终达到一致性,而这个一致性在业务上是可以被允许的。
强一致性:必须保证ACID这四个特性。比如银行转账就是非常典型的例子,其实只要和前有关的,就必须保证强一致性,哪怕不可用,中断交易,也必须保证多方的数据是一致的,因为钱很重要,是经济稳定的根本。(对吧,咱们一下子高度又拔高了~)所以我有时候在atm上存钱其实会很慢,因为他要做好很多的把控,不能少你钱也不能多你钱,有时候也会很慢,登录几分钟,甚至最后一步出现错误会导致你重新存钱。上一节课提到的本地事务,在单体架构、单个服务中的事务,都是强事务,强一致性。
弱一致性:隔离性无所谓,实现的是最终一致性。我下了订单,也付钱了,这些操作都成功了,但是订单我现在查不到,可能要等10分钟以后才会有,这是典型的高并发案例把,在双11的时候也非常常见,这就是弱一致性。也是互联网的常用手段。而转账的时候,钱转了,对方查询要等10分钟以后才有,那双发岂不是要煎熬10分钟?尤其买房交易的时候,多难受啊对吧,所以金融类交易必须是强一致性。而分布式系统中往往都是弱一致性。
2PC,也叫做2阶段协议提交,把咱们的分布式事务拆分为2个阶段。这两个阶段是由协调者和参与者组成的。
如下图:
处理过程:
举例:赛道短跑比赛,吹口哨的会说,预备。。。
这是第一个阶段,让大家就绪。再说跑。。。
,这个时候大家都跑了,如果不预备,可能有有没准备好导致滑到,那么大家又得重新来。
简单来说,2pc就是2个步骤,先准备,后提交,哪个步骤确认失败都会进行回滚。
需要注意,2PC的性能不好,因为事务资源管理器会占用大量资源,互联网高并发项目肯定不能用,金融类的没关系。
TCC就是事务补偿机制,try、confirm、cancel。这整个闭环是在业务层自己进行控制的。
使用tcc:
相对来说,tcc是柔性事务,他是最终一致性。而刚性事务呢,就是必须满足ACID各项特性,也就是强一致性。互联网项目中,使用tcc的还是偏多的。
最大努力通知,这种方案一般用于和第三方的对接,比如微信支付、支付宝支付。另外有一些兄弟公司或者同公司的兄弟项目,也会使用这种方式。在我们进行支付接口调用完毕后,最终到底支付失败还是成功,对方会给我们一个通知,这个通知是多次的,定时每隔几秒发一次通知,一般来说是8次,比如1s/10s/30s/60s/5m/10m/30m/……,多次发通知的目的其实就是让我们自己去对接过做好核对,不管成功还是失败都要做。这种方式的原理其实也就是通过mq异步发通知。如果是成功的通知,我们是否处理也无所谓,如果处理,则需要返回响应,说我知道了,你别再发了,对方则中断通知;如果失败,那么自己就需要做好失败的相关代码逻辑。(这块内容在后续对接微信的时候,也会有)
除此以外,是否成功失败的状态,第三方也提供了一个专门的接口,提供给我们查询,以防8次通知后 对方不发了,但是我们还没来得及处理,这个时候就需要我们自己手动主动去查询结果,最终再处理自己的成功或失败的业务。
所以说最大努力通知其实也可以称之为数据定期校对。最大努力,其实也就是事务发起方使劲浑身解数(比如:重试,轮训。。等操作)对数据进行校验,保证两头都是没问题的。我们自己公司也有类似的场景,就是数据回盘,我们会写到txt文本文件,然后和第三方公司接口的数据进行比对,如果不对,则该笔订单撤销。
最终一致性呢,就是把本地的多个事务进行拆分,拆分为各个子事务,中间使用消息队列进行异步协调来完成。
从图中可以看到,这里面并不像2pc和tcc那样,是有事务管理器的,采用消息队列并不需要TM。
所以对于这样的一种情况来说,最终必定是成功的,因为如果不成功,消息数据一直存在与数据库中,哪怕服务参与者无法处理该业务,一直抛出异常,或者宕机死机,那么再修复完毕以后,重新读取消息表,则还是依然可以处理数据的。如此一来,多端的服务最终都会达到一致性,虽然中间会有一定的延迟时间间隔,但是最终一致性的目的是可以达到的。
前面聊了一些常见的分布式事务方案,接下来我们所主要实现的是通过微服务的阿里组件,seata来实现微服务领域中的分布式事务。
https://github.com/seata/seata https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
业务流程
这个XID,作为全局事务ID是贯穿整个分布式事务的过程的。 我们通过实例再来阐述一下流程: TC:项目经理 TM:产品经理 RM:程序员
从上面看的出来啊,这个TM产品经理就是搞事情的啊,做也是你,不做也是你。
选择我们所需要的版本
初始化sql脚本: https://github.com/seata/seata/blob/1.5.2/script/server/db/mysql.sql https://github.com/seata/seata/blob/1.5.2/script/client/at/db/mysql.sql
创建seata数据库,并且运行脚本,如下:
需要注意,undo_log这张表是属于业务库的,我们需要放入自己的数据库里作为客户端表。因为我们只有一个业务库,所以放一份。如果你有多个数据库对应到自己的微服务,则每一个库都需要放这张表。
这些表我们可以不用理会,是seata服务去进行使用的。
docker pull seataio/seata-server:1.5.2
docker run \
--name seata-server \
-p 8091:8091 \
-p 7091:7091 \
-d seataio/seata-server:1.5.2
mkdir /home/seata/resources -p
进入容器内部:
docker exec -it seata-server sh
这是配置文件列表:(早期会有两个配置文件,新版本里没有了,使用的是yml配置文件)
docker cp seata-server:/seata-server/resources /home/seata
修改配置文件application.yml
: 如下,这段配置可以直接从示例example中复制过来
以上分别是配置注册中心,配置中心,以及数据库存储配置
docker stop seata-server
docker rm seata-server
docker run \
--name seata-server \
-p 8091:8091 \
-p 7091:7091 \
-v /home/seata/resources:/seata-server/resources \
-d seataio/seata-server:1.5.2
为啥这么操作呢?为啥不一开始直接挂载呢,还少了一个步骤?因为我们需要他的配置文件,如果直接挂载,配置文件目录是为空的,需要重新下载源码包,再解压缩复制,步骤比较繁琐。
打开Nacos,检查结果:
如此安装配置成功!
Seata 服务端配置好以后,还需要再业务端,也就是咱们微服务节点里去进行配置。
api子工程pom中引入依赖,需要注意版本匹配:
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
- <exclusions>
- <exclusion>
- <groupId>io.seata</groupId>
- <artifactId>seata-spring-boot-starter</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>io.seata</groupId>
- <artifactId>seata-spring-boot-starter</artifactId>
- <version>1.5.2</version>
- </dependency>
需要注意,如果不排除,那么自带的seata版本是老版本,如此咱们在seata服务端的配置就失效了,因为版本不同配置方式已经更改了。
此处部分idea可能存在问,也就是以来的RM微服务端并没有覆盖原来的seata版本,如此会有问题,则需要在各自微服务的pom中添加如下,也就是再次覆盖:
- <dependency>
- <groupId>io.seata</groupId>
- <artifactId>seata-spring-boot-starter</artifactId>
- <version>1.5.2</version>
- </dependency>
配置本地事务所在微服务的seata配置,这两个文件一致即可。
重新运行接口,数据库没有数据,说明回滚成功
如果用的早期老版本 1.4 之前的,那么数据库是mysql8的话,datatime会有序列化问题,一定要注意,如果你现在用老版本,那么建议用mysql5或者mariadb。因为1.5里的jackson序列化支持对datatime的解析了。一定要注意。 此外。1.5和1.4的配置文件也完全不一样,有的同学安装1.4的方式安装1.5那肯定是不行的,会有坑。
很多时候,我们会有一些全局异常的捕获,但是这个时候,全局事务还会不会生效。
我们来测试如下几种情况:
这种情况和之前的一样,并没有改动代码。
结果你会发现全局事务回滚了。
这个时候,再次运行,观察数据库,我们发现数据新增了,导致两边不一致,如此分布式事务失效了,如下:
结果:用户数据新增了,简历数据由于异常本地回滚了。所以,全局事务没有触发,导致两边不一致。
原因分析:因为全局异常捕获了,被调方返回的是一个正常的响应数据,是经过处理的,所以发起方或者说seata就任务当前的远程调用是正常的,所以最终显示事务提交的信息,而不是会进行回滚。
所以大家思考一下,我们要不要添加全局异常的捕获?又或者说,我们一定要添加全局异常的捕获并且要实现分布式事务回滚,怎么办?大家课后思考一下。
创建切面:
创建全局事务
捕获到异常后回滚:
被调方也需要回滚:
运行结果:
回滚成功
我们现在的异常被全局捕获并且被统一处理,这种情况之下,我们会返回一个更加人性化的信息给到前端用户去看。按照我们以前的思路的确是这样。但是,我们现在是微服务的调用,这些优雅的包装信息给调用方,也就是服务的发起方来查看其实没有太大的意义,所以这个全局异常再这样的情况之下可以省去,是没有必要的。
我们现在所操作分布式事务的场景其实并不是金融案例。仔细想一想,用户注册完毕,对于一份空的简历而言,其实可有可无的,他们之间并没有像转账那样有强关联性。所以,我们这里完全可以通过mq来解耦,只要mq投递成功,那么后续操作无所谓的。所以此处我们完全可以使用mq来解耦。
只不过,解耦操作完毕以后,我们需要保证消息前后的一致性即可。所以这样的场景,我们也称之为最终一致性。
消息队列配置类: 建议直接拷贝以前的进行修改:
- @Configuration
- public class InitResumeMQConfig {
-
- // 定义交换机的名称
- public static final String INIT_RESUME_EXCHANGE = "init_resume_exchange";
-
- // 定义队列的名称
- public static final String INIT_RESUME_QUEUE = "init_resume_queue";
-
- // 统一定义路由key
- public static final String ROUTING_KEY_INIT_RESUME = "init.resume.display";
-
- // 创建交换机
- @Bean(INIT_RESUME_EXCHANGE)
- public Exchange exchange() {
- return ExchangeBuilder
- .topicExchange(INIT_RESUME_EXCHANGE)
- .durable(true)
- .build();
- }
-
- // 创建队列
- @Bean(INIT_RESUME_QUEUE)
- public Queue queue() {
- return QueueBuilder
- .durable(INIT_RESUME_QUEUE)
- .build();
- }
-
- // 创建绑定关系
- @Bean
- public Binding initResumeBinding(@Qualifier(INIT_RESUME_EXCHANGE) Exchange exchange,
- @Qualifier(INIT_RESUME_QUEUE) Queue queue) {
- return BindingBuilder
- .bind(queue)
- .to(exchange)
- .with("init.resume.#")
- .noargs();
- }
-
- }
UsersService中新增业务方法,用于测试MQ一致性:
UsersService实现:
此时的创建用户就是不同的插入操作了:
创建监听消费类:
yml添加配置:
为了测试方便,使用固定1234验证码则直接通过:
通过消息队列的监听来实现两边数据表的插入记录。
逆向工具生成消息表的entity、mapper等并复制到项目中(步骤略)。
创建生产者助手类:
保存消息到本地:
调用端代码,此时MQ消息不发送,先保存到数据库,然后再提交以后进行消息发送:
检查消息表有没保存到新纪录
创建我的自定义事务管理器
重写事务提交方法,提交后,不管怎样,执行并且发消息给消费端,让第二个微服务进行事务处理
增加批量查询的方法
打断点,测试流程是怎么走的。 观察消息的id以及消息内容是否ok。
消息一旦消费成功,则需要进行处理,把消息记录从本地数据库中删除。
开启MQ的手动ACK机制
成功则删除消息,并且ack确认,失败则重回队列:
可以看到,失败的消息,由于没有ack,他会一直存在于队列中,直到成功,那么会删除数据表中对应的消息,如此,本地消息数据表最终会清空。
观察日志:
apipost可以并发测试:
最终测试结束,消息表全部清空,最终一致性的目的达到
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。