赞
踩
之前我们说到,分布式事务是一个复杂的技术问题。没有通用的解决方案,也缺乏简单高效的手段。
不过,如果我们的系统不追求强一致性,那么最常用的还是最终一致性方案。今天,我们就基于 RocketMQ
来实现消息最终一致性方案的分布式事务。
本文代码不只是简单的demo,考虑到一些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务场景靠拢。
另外,在最后还有《RocketMQ技术内幕》一书中,关于分布式事务示例代码的错误流程分析,所以篇幅较长,希望大家耐心观看。
在这里,笔者不想使用大量的文字赘述 RocketMQ
事务消息的原理,我们只需要搞明白两个概念。
暂时不能被 Consumer
消费的消息。Producer
已经把消息发送到 Broker
端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC
的主题下。
当 Producer
端对它二次确认后,也就是 Commit
之后,Consumer
端才可以消费到;那么如果是Rollback
,该消息则会被删除,永远不会被消费到。
我们想,可能会因为网络原因、应用问题等,导致Producer
端一直没有对这个半消息进行确认,那么这时候 Broker
服务器会定时扫描这些半消息,主动找Producer
端查询该消息的状态。
当然,什么时候去扫描,包含扫描几次,我们都可以配置,在后文我们再细说。
简而言之,RocketMQ
事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。
在本文,我们的代码就以 订单服务、积分服务
为例。结合上文来看,整体流程如下:
在订单服务中,我们接收前端的请求创建订单,保存相关数据到本地数据库。
在订单服务中,除了有一张订单表之外,还需要一个事务日志表。
它的定义如下:
CREATE TABLE `transaction_log` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事务ID',
`business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
这张表专门作用于事务状态回查。当提交业务数据时,此表也插入一条数据,它们共处一个本地事务中。通过事务ID查询该表,如果返回记录,则证明本地事务已提交;如果未返回记录,则本地事务可能是未知状态或者是回滚状态。
我们知道,通过 RocketMQ
发送消息,需先创建一个消息发送者。值得注意的是,如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer
。
@Component public class TransactionProducer { private String producerGroup = "order_trans_group"; private TransactionMQProducer producer; //用于执行本地事务和事务状态回查的监听器 @Autowired OrderTransactionListener orderTransactionListener; //执行任务的线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50)); @PostConstruct public void init(){ producer = new TransactionMQProducer(producerGroup); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setSendMsgTimeout(Integer.MAX_VALUE); producer.setExecutorService(executor); producer.setTransactionListener(orderTransactionListener); this.start(); } private void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } //事务消息发送 public TransactionSendResult send(String data, String topic) throws MQClientException { Message message = new Message(topic,data.getBytes()); return this.producer.sendMessageInTransaction(message, null); } }
上面的代码中,主要就是创建事务消息的发送者。在这里,我们重点关注 OrderTransactionListener
,它负责执行本地事务和事务状态回查。
@Component public class OrderTransactionListener implements TransactionListener { @Autowired OrderService orderService; @Autowired TransactionLogService transactionLogService; Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { logger.info("开始执行本地事务...."); LocalTransactionState state; try{ String body = new String(message.getBody()); OrderDTO order = JSO
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。