赞
踩
RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。
流程主要分为两个阶段:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 生产者发送半事物消息到消息队列服务端。
(2) 消息队列服务端将消息持久化成功之后,返回Ack的确认发送成功的消息。也就是半事物消息发送成功。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)相关逻辑。
(4) 生产者根据本地事务状态结果向消息队列服务端提交二次确认结果,执行Commit(Commit操作服务端把半事物消息标记为可投递,并投递给消费者)或者Rollback(服务端把事物回滚,不会将半事务的消息投递给消费者)。
2.事务消息的补偿流程:
(5)对于没有Commit/Rollback的事务消息,经过国定的时间后,服务端会对消息生产者发起消息回查。
(6) Producer收到回查消息,检查回查消息对应的本地事务的状态。
(7) 根据本地事务状态,重新Commit或者Rollback。
半事务消息:暂时不能投递,生产者已经把消息放到了服务端。但是MQ没有收到生产者的二次确认,标记为"暂不能投递状态",属于半事务消息。
消息回查:网络中断,生产者重启,导致某个事务消息的二次确认丢失,mq查询到某条消息长期处于“暂不能投递状态”,查询生产者这个消息的最终状态(回滚或提交),询问过程叫消息回查。
half(半事物)消息,需要备份原消息的主题与消息消费队列,之后改变Topic名字为RMQ_SYS_TRANS_HALF_TOPIC。由于没有消费组订阅这个topic,所以消费端不会消费half类型的消息。RocketMQ中消息的服务端存储结构,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容(commitLog)。
RocketMQ的具体实现策略:如果写入的是事务消息,对消息的Topic和Queue等属性进行替换,将原来的Topic和Queue信息存储到消息属性中。然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态的请求,根据事务状态决定是提交或者回滚消息。改变主题名称和延迟队列实现策略差不多。
提交完成半事物消息之后,下一步如果是Rollback操作,需要撤销(由于RocketMq是顺序写入的,撤销不是真正的把这条消息删除)半事物消息,需要一个操作来标识这条消息的最终状态。RocketMQ事务消息引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或Rollback)。如果一个事务没有对应的Op消息,说明这个事务的状态还没有到达最终状态,就是二阶段还没有最终结果。下一步如果是Commit操作,需要让消息对消费者可见,Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
RocketMQ将Op消息写入到全局一个特定的Topic中(源码位置TransactionalMessageUtil.buildOpTopic()),这个Topic是一个内部Topic不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,通过Op消息能索引到Half消息进行后续的回查操作。
执行Commit操作时,需要创建Half消息的索引。开始Half消息写入了一个特殊的Topic为RMQ_SYS_TRANS_HALF_TOPIC中,需要读取出来Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后写入到目标Topic生成一条对用户可见的消息。就相当于RocketMQ事务消息在Commit阶段,使用了Half消息存储的消息内容,在Commit阶段是恢复出完成的一条普通消息,走一边写入的流程。
如果在RocketMQ事务消息的二阶段过程中失败了,比如Commit操作失败,出现网络问题之类的,需要通过补偿策略使这条消息最终被Commit。RocketMQ采用了一种补偿的机制“回查”。Broker端对于未确定状态的消息,在Broker配置文件中的参数transactionTimeout配置的特定时间长度之后发起回查,将消息发送到对应的Producer端,由Producer根据消息来检查本地事务的状态,之后在执行Commit或Rollback。Broker端根据Half消息和Op消息进行事务消息的回查并且记录那些事务消息的状态是确定的。RocketMQ默认回查15次(为了避免单个消息被检查太多次而导致半队列消息累积),可以通过Broker配置文件的transactionCheckMax参数修改,如果超过15次还没有获取到事务状态,默认回滚消息。
- //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
- private static boolean checkOrderById(String orderId) {
- return true;
- }
- //演示demo,模拟本地事务的执行结果。
- private static boolean doLocalTransaction() {
- return true;
- }
- public static void main(String[] args) throws ClientException {
- ClientServiceProvider provider = new ClientServiceProvider();
- MessageBuilder messageBuilder = new MessageBuilder();
- //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
- Producer producer = provider.newProducerBuilder()
- .setTransactionChecker(messageView -> {
- /**
- * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
- * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
- */
- final String orderId = messageView.getProperties().get("OrderId");
- if (Strings.isNullOrEmpty(orderId)) {
- // 错误的消息,直接返回Rollback。
- return TransactionResolution.ROLLBACK;
- }
- return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
- })
- .build();
- //开启事务分支。
- final Transaction transaction;
- try {
- transaction = producer.beginTransaction();
- } catch (ClientException e) {
- e.printStackTrace();
- //事务分支开启失败,直接退出。
- return;
- }
- Message message = messageBuilder.setTopic("topic")
- //设置消息索引键,可根据关键字精确查找某条消息。
- .setKeys("messageKey")
- //设置消息Tag,用于消费端根据指定Tag过滤消息。
- .setTag("messageTag")
- //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
- .addProperty("OrderId", "xxx")
- //消息体。
- .setBody("messageBody".getBytes())
- .build();
- //发送半事务消息
- final SendReceipt sendReceipt;
- try {
- sendReceipt = producer.send(message, transaction);
- } catch (ClientException e) {
- //半事务消息发送失败,事务可以直接退出并回滚。
- return;
- }
- /**
- * 执行本地事务,并确定本地事务结果。
- * 1. 如果本地事务提交成功,则提交消息事务。
- * 2. 如果本地事务提交失败,则回滚消息事务。
- * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
- *
- */
- boolean localTransactionOk = doLocalTransaction();
- if (localTransactionOk) {
- try {
- transaction.commit();
- } catch (ClientException e) {
- // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
- e.printStackTrace();
- }
- } else {
- try {
- transaction.rollback();
- } catch (ClientException e) {
- // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
- e.printStackTrace();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。