赞
踩
事务消息的使用场景很多,比如在电商系统中用户下单后新增了订单记录,对应的商品库存需要减少。怎么保证新增订单后商品库存减少呢?又例如红包业务,张三给李四发红包,张三的账户余额需要扣减,李四的账户余额需要增加,怎么保证张三账户扣减李四账户加钱呢?
这些问题都是事务问题,可以简单理解为:一个表的数据更新后,如何保证另外一个表的数据也更新成功。如果使用同一个数据库实例,那么问题很简单,可以使用本地事务来解决,Spring的@Transactional注解就支持。
但实际场景并不这么简单,互联网应用的流量大,系统规模通常也比较大,会存在许多数据库实例、分库分表等。我们需要修改的表往往不在同一个数据库实例或者同一个数据库中,此时就不能使用本地事务来解决了,需要用到分布式事务。RocketMQ的一大特点就是支持事务消息,支持一些分布式事务场景。
我们使用RocketMQ事务消息来模拟下单减库存的场景,代码仅包含核心功能代码
@RestController
public class TransactionalController {
@Autowired
private Source source;
public String transactional() {
Order order = new Order("123", "test");
String transactionId = UUID.randomUUID().toString();
MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId);
Message message = builder.build();
source.output().send(message);
return "OK";
}
}
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
Order对象保存了订单信息,随机生成一个ID作为消息的事务ID。此时消息已经发送到Broker中,但还未投递出去,Consumer暂时还不能消费这条消息。
2. 执行订单信息入库的事务操作,提交或回滚事务消息
@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup") public class TransactionalMsgListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { // 获取前面生成的事务ID String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); // 以事务ID为主键,执行本地事务 Order order = (Order) message.getPayload(); boolean result = this.saveOrder(order, transactionId); return result ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } private boolean saveOrder(Order order, String transactionId) { // 将事务ID设置为唯一键 // 调用数据库Insert into 订单表 return true; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 获取事务ID String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); // 以事务ID为主键,查询本地事务执行情况 if (isSuccess(transactionId)) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } private boolean isSuccess(String transactionId) { // 查询数据库 select from 订单表 return true; } }
实现RocketMQLocalTransactionListener接口,使用@RocketMQTransactionListener注解用于接收本地事务的监听,txProducerGroup是事务组名称,RocketMQLocalTransactionListener接口有两个实现方法:
@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
server.port=8091
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.bindings.input.group=test-group1
消费事务消息与消费普通消息的代码是一样的,无需做任何修改。
RocketMQ采用了2PC的方案来提交事务消息。第一阶段Producer向Broker发送预处理消息,此时消息还未被投递出去,Consumer还不能消费;第二阶段Producer向Broker发送提交或者回滚消息。具体流程如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。