当前位置:   article > 正文

RocketMQ系列6——事务消息_rocketmq事务消息

rocketmq事务消息

        RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。

一、事务流程

流程主要分为两个阶段:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 生产者发送半事物消息到消息队列服务端。

(2) 消息队列服务端将消息持久化成功之后,返回Ack的确认发送成功的消息。也就是半事物消息发送成功。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)相关逻辑。

(4) 生产者根据本地事务状态结果向消息队列服务端提交二次确认结果,执行Commit(Commit操作服务端把半事物消息标记为可投递,并投递给消费者)或者Rollback(服务端把事物回滚,不会将半事务的消息投递给消费者)。

2.事务消息的补偿流程:

(5)对于没有Commit/Rollback的事务消息,经过国定的时间后,服务端会对消息生产者发起消息回查。

(6) Producer收到回查消息,检查回查消息对应的本地事务的状态。

(7) 根据本地事务状态,重新Commit或者Rollback。

半事务消息:暂时不能投递,生产者已经把消息放到了服务端。但是MQ没有收到生产者的二次确认,标记为"暂不能投递状态",属于半事务消息。

消息回查:网络中断,生产者重启,导致某个事务消息的二次确认丢失,mq查询到某条消息长期处于“暂不能投递状态”,查询生产者这个消息的最终状态(回滚或提交),询问过程叫消息回查。

二、事务消息设计

1.半事物消息对用户不可见原理

        half(半事物)消息,需要备份原消息的主题与消息消费队列,之后改变Topic名字为RMQ_SYS_TRANS_HALF_TOPIC。由于没有消费组订阅这个topic,所以消费端不会消费half类型的消息。RocketMQ中消息的服务端存储结构,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容(commitLog)。

RocketMQ的具体实现策略:如果写入的是事务消息,对消息的Topic和Queue等属性进行替换,将原来的Topic和Queue信息存储到消息属性中。然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态的请求,根据事务状态决定是提交或者回滚消息。改变主题名称和延迟队列实现策略差不多。

2.Commit和Rollback操作和Op消息

        提交完成半事物消息之后,下一步如果是Rollback操作,需要撤销(由于RocketMq是顺序写入的,撤销不是真正的把这条消息删除)半事物消息,需要一个操作来标识这条消息的最终状态。RocketMQ事务消息引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或Rollback)。如果一个事务没有对应的Op消息,说明这个事务的状态还没有到达最终状态,就是二阶段还没有最终结果。下一步如果是Commit操作,需要让消息对消费者可见,Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

3.Op消息的存储和对应关系

        RocketMQ将Op消息写入到全局一个特定的Topic中(源码位置TransactionalMessageUtil.buildOpTopic()),这个Topic是一个内部Topic不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,通过Op消息能索引到Half消息进行后续的回查操作。

4.Half消息的索引构建

        执行Commit操作时,需要创建Half消息的索引。开始Half消息写入了一个特殊的Topic为RMQ_SYS_TRANS_HALF_TOPIC中,需要读取出来Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后写入到目标Topic生成一条对用户可见的消息。就相当于RocketMQ事务消息在Commit阶段,使用了Half消息存储的消息内容,在Commit阶段是恢复出完成的一条普通消息,走一边写入的流程。

5.处理二阶段失败的消息

        如果在RocketMQ事务消息的二阶段过程中失败了,比如Commit操作失败,出现网络问题之类的,需要通过补偿策略使这条消息最终被Commit。RocketMQ采用了一种补偿的机制“回查”。Broker端对于未确定状态的消息,在Broker配置文件中的参数transactionTimeout配置的特定时间长度之后发起回查,将消息发送到对应的Producer端,由Producer根据消息来检查本地事务的状态,之后在执行Commit或Rollback。Broker端根据Half消息和Op消息进行事务消息的回查并且记录那些事务消息的状态是确定的。RocketMQ默认回查15次(为了避免单个消息被检查太多次而导致半队列消息累积),可以通过Broker配置文件的transactionCheckMax参数修改,如果超过15次还没有获取到事务状态,默认回滚消息。

三、事务消息使用上的限制

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionalMessageCheckListener类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionTimeout参数。
  • 事务性消息可能不止一次被检查或消费。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

四、事务java代码示例

  1. //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
  2. private static boolean checkOrderById(String orderId) {
  3. return true;
  4. }
  5. //演示demo,模拟本地事务的执行结果。
  6. private static boolean doLocalTransaction() {
  7. return true;
  8. }
  9. public static void main(String[] args) throws ClientException {
  10. ClientServiceProvider provider = new ClientServiceProvider();
  11. MessageBuilder messageBuilder = new MessageBuilder();
  12. //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
  13. Producer producer = provider.newProducerBuilder()
  14. .setTransactionChecker(messageView -> {
  15. /**
  16. * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
  17. * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
  18. */
  19. final String orderId = messageView.getProperties().get("OrderId");
  20. if (Strings.isNullOrEmpty(orderId)) {
  21. // 错误的消息,直接返回Rollback
  22. return TransactionResolution.ROLLBACK;
  23. }
  24. return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
  25. })
  26. .build();
  27. //开启事务分支。
  28. final Transaction transaction;
  29. try {
  30. transaction = producer.beginTransaction();
  31. } catch (ClientException e) {
  32. e.printStackTrace();
  33. //事务分支开启失败,直接退出。
  34. return;
  35. }
  36. Message message = messageBuilder.setTopic("topic")
  37. //设置消息索引键,可根据关键字精确查找某条消息。
  38. .setKeys("messageKey")
  39. //设置消息Tag,用于消费端根据指定Tag过滤消息。
  40. .setTag("messageTag")
  41. //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
  42. .addProperty("OrderId", "xxx")
  43. //消息体。
  44. .setBody("messageBody".getBytes())
  45. .build();
  46. //发送半事务消息
  47. final SendReceipt sendReceipt;
  48. try {
  49. sendReceipt = producer.send(message, transaction);
  50. } catch (ClientException e) {
  51. //半事务消息发送失败,事务可以直接退出并回滚。
  52. return;
  53. }
  54. /**
  55. * 执行本地事务,并确定本地事务结果。
  56. * 1. 如果本地事务提交成功,则提交消息事务。
  57. * 2. 如果本地事务提交失败,则回滚消息事务。
  58. * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
  59. *
  60. */
  61. boolean localTransactionOk = doLocalTransaction();
  62. if (localTransactionOk) {
  63. try {
  64. transaction.commit();
  65. } catch (ClientException e) {
  66. // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
  67. e.printStackTrace();
  68. }
  69. } else {
  70. try {
  71. transaction.rollback();
  72. } catch (ClientException e) {
  73. // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
  74. e.printStackTrace();
  75. }
  76. }
  77. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/622522
推荐阅读
相关标签
  

闽ICP备14008679号