当前位置:   article > 正文

RocketMQ快速入门:事务消息原理及实现(十)

RocketMQ快速入门:事务消息原理及实现(十)

0. 引言

rocketmq 的一大特性就是支持事务性消息,这在诸多场景中有所应用。在之前的文章中我们已经讲解过事务消息的使用,但事务消息是如何实现的呢? 今天我们就来探究其原理

1. 原理

rocketmq事务消息的基本流程分为三步:
(1)发送 half 消息到 broker。
(2)执行本地事务。
(3)根据本地事务执行结果:本地事务执行成功则将half消息标识为COMMIT,将消息进行提交,变成普通消息,可以被消费者消费;本地事务执行失败将half消息标识为ROLLBACK,并将消息从消息日志中删除,消费者则消费不到了。

从流程上可以看出rocketmq的事务消息,和数据库的事务有些类型,都有一个二阶段提交的概念。

rocketmq的half message(半消息)是一种特殊的消息,该种消息不会被消费者消费,但是可以被TransactionListener事务监听器获取到。

而我们就在事务监听器中书写自己的本地事务逻辑,本地事务执行成功后才将消息正常提交,这时提交后的消息才能被消费者消费到,否则就会回滚消息,消息就相当于从来没发送过。

这里需要注意的是,因为我们的本地事务逻辑已经在半消息接收的时候处理掉了,所以如果后续没有消费逻辑了,就不用再在消费者里书写逻辑,但如果还有后续的逻辑,就可以按照正常消费流程进行书写。
在这里插入图片描述

但是还有一个场景我们需要考虑,当本地事务执行完成后,在返回“执行成功(提交)”或“执行失败(回滚)”状态时,因为网络波动或者broker服务挂了,导致broker没有正常收到这个状态,从而无法及时把half message进行提交或回滚

这时就需要有个定时巡查机制,来检查这些没有正常收到提交状态的消息的实际状态到底是什么,这个巡查机制就是消息回查,也称为事务消息的补偿。在事务监听器TransactionListener中就是通过checkLocalTransaction方法来实现,executeLocalTransaction方法返回值一共有3种状态:

  • COMMIT_MESSAGE

提交状态,事务正常进行,一般是本地事务执行成功后进行设置。告知broker提交该事务消息,然后消费者可以消费该消息,当然此时消费者已经执行完本地事务了,再消费可以根据业务逻辑进行后续的逻辑处理,如果没有相关逻辑了忽略消息即可

  • ROLLBACK_MESSAGE

回滚状态,事务撤回,broker将删除当前half消息,一般是本地事务执行失败后进行设置

  • UNKNOW

未知状态,固定时间后Broker端会通过checkLocalTransaction方法进行消息回查,根据回查结果来判断该消息是提交还是回滚

示例代码:

@Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // TODO 执行本地事务(书写你自己的本地事务逻辑)
            try{
                String body = new String(msg.getBody());
                int i = Integer.parseInt(body);
                // 模拟偶数执行成功,奇数执行失败
                if(i % 2 == 0){
                    System.out.println("本地事务执行成功:"+body);
                    // 执行成功
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else{
                    System.out.println("本地事务执行失败:"+body);
                    // 执行失败
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            }catch (Exception e){
                e.printStackTrace();
                // 执行失败
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

因此checkLocalTransaction方法中就要书写检查本地事务状态的方法,比如事务是对订单提交消息的消费,那么就去查询订单状态,如果已经是提交状态那么就返回COMMIT_MESSAGE,否则就返回ROLLBACK_MESSAGE

示例代码

       @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // TODO 去缓存或者数据库查询当前消息的实际状态

            // 模拟查询到状态为1
            Integer status = 1;

            // 不同实际状态对应的消息状态
            if (null != status) {
                switch (status) {
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2. 事务消息的实现

2.1 java client实现(适用于spring框架)

参考之前文章:https://wu55555.blog.csdn.net/article/details/138338692

2.2 springboot实现

参考之前文章:https://wu55555.blog.csdn.net/article/details/139741449

3. 总结

最后一句话总结一下,rocketmq的事务消息,是通过half消息(即二阶段提交)+回查机制来实现的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/775462
推荐阅读
相关标签
  

闽ICP备14008679号