当前位置:   article > 正文

RocketMQ - 如何实现事务消息_rocketmqlocaltransactionlistener

rocketmqlocaltransactionlistener

事务消息的使用场景

事务消息的使用场景很多,比如在电商系统中用户下单后新增了订单记录,对应的商品库存需要减少。怎么保证新增订单后商品库存减少呢?又例如红包业务,张三给李四发红包,张三的账户余额需要扣减,李四的账户余额需要增加,怎么保证张三账户扣减李四账户加钱呢?

这些问题都是事务问题,可以简单理解为:一个表的数据更新后,如何保证另外一个表的数据也更新成功。如果使用同一个数据库实例,那么问题很简单,可以使用本地事务来解决,Spring的@Transactional注解就支持。

但实际场景并不这么简单,互联网应用的流量大,系统规模通常也比较大,会存在许多数据库实例、分库分表等。我们需要修改的表往往不在同一个数据库实例或者同一个数据库中,此时就不能使用本地事务来解决了,需要用到分布式事务。RocketMQ的一大特点就是支持事务消息,支持一些分布式事务场景。

如何发送事务消息

我们使用RocketMQ事务消息来模拟下单减库存的场景,代码仅包含核心功能代码

  1. 发送订单的事务消息,预提交
@RestController
public class TransactionalController {

    @Autowired
    private Source source;

    public String transactional() {
        Order order = new Order("123", "test");
        String transactionId = UUID.randomUUID().toString();
        MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId);
        Message message = builder.build();
        source.output().send(message);
        return "OK";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
  • 1
  • 2
  • 3
  • 4

Order对象保存了订单信息,随机生成一个ID作为消息的事务ID。此时消息已经发送到Broker中,但还未投递出去,Consumer暂时还不能消费这条消息。
2. 执行订单信息入库的事务操作,提交或回滚事务消息

@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup")
public class TransactionalMsgListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            // 获取前面生成的事务ID
            String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            // 以事务ID为主键,执行本地事务
            Order order = (Order) message.getPayload();
            boolean result = this.saveOrder(order, transactionId);
            return result ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    private boolean saveOrder(Order order, String transactionId) {
        // 将事务ID设置为唯一键
        // 调用数据库Insert into 订单表
        return true;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 获取事务ID
        String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        // 以事务ID为主键,查询本地事务执行情况
        if (isSuccess(transactionId)) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    private boolean isSuccess(String transactionId) {
        // 查询数据库 select from 订单表
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

实现RocketMQLocalTransactionListener接口,使用@RocketMQTransactionListener注解用于接收本地事务的监听,txProducerGroup是事务组名称,RocketMQLocalTransactionListener接口有两个实现方法:

  • executeLocalTransaction:执行本地事务,在第一步中消息发送成功会回调执行,一旦事务提交成功,下游应用的Consumer能收到该消息,在这里demo的本地事务就是保存订单信息入库
  • checkLocalTransaction:检查本地事务执行状态,如果executeLocalTransaction方法中返回的状态是未知UNKNOWN或者未返回状态,默认会在预处理发送的1分钟后由Broker通知Producer检查本地事务,在Producer中回调本地事务监听器中的checkLocalTransaction方法。检查本地事务时,可以根据事务ID查询本地事务的状态,再返回具体事务状态给Broker。
  1. 消费订单消息
@EnableBinding({Sink.class})
@SpringBootApplication
public class App 
{
    public static void main( String[] args )
    {
        SpringApplication.run(App.class);
    }

    @StreamListener(Sink.INPUT)
    public void receive(String msg) {
        System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
server.port=8091
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.bindings.input.group=test-group1
  • 1
  • 2
  • 3
  • 4

消费事务消息与消费普通消息的代码是一样的,无需做任何修改。

事务消息的技术原理

RocketMQ采用了2PC的方案来提交事务消息。第一阶段Producer向Broker发送预处理消息,此时消息还未被投递出去,Consumer还不能消费;第二阶段Producer向Broker发送提交或者回滚消息。具体流程如下:

  • 发送预处理消息成功后,开始执行本地事务。
  • 如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给Consumer
    在这里插入图片描述
  • 如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给Consumer
    在这里插入图片描述
  • 如果本地事务状态未知,网络故障或Producer宕机,Broker未收到二次确认的消息。由Broker端发送请求给Producer进行消息回查,确认提交或回滚。如果消息状态一致未被确认,需要人工介入处理。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/622525
推荐阅读
相关标签
  

闽ICP备14008679号