赞
踩
阅读本文后,我们将了解以上两个问题的答案。
1、RocketMq生产者组(producer group)的设定有什么用?
2、一个订单处理的场景,消费者订阅了订单topic,但总担心丢消息。于是为了防止丢消息,每天都会通过定时任务做一致性校验,这是最佳方案吗?
这还得从基于消息队列实现实现分布式事务
的解决方案说起。
常见的分布式事务包括 2PC、3PC、TCC、本地消息表、消息事务、最大努力通知。
今天我们主要讲讲本地消息表
、消息事务
两种方案。
本地消息表的大致流程是这样的。
public class OrderService {
@Transactional
public Map createOrder() {
Map result = new HashMap<>();
// 1、执行下订单业务流程,插入订单表
// 2、生成事务消息的唯一ID,将事务ID组装到消息体中
// 3、将消息插入到本地消息表中。 消息包括tran_id,消息内容,状态:待发送,创建时间
// 4、返回结果,结束事务。 中间出错,则回滚事务
return result;
}
}
我们通过本地数据库事务,保证了业务逻辑与插消息本地表的原子性。
后台再起一个定时任务,扫描待发送的消息,发送到Mq中。发送成功则更新状态为已发送,发送失败则进行重试。
接下来我们会以RocketMq为例介绍它对消息事务的实现,并结合实战操作。
因为RocketMq很好地实现了消息事务(4.3之后支持)
,保证了我们平时难搞的,本地事务与发送消息的原子性。采用的思想是2PC,两阶段提交
来替代了本地消息表,并通过事务结果回查
解决消息发送失败、客户端宕机等极端场景。
2PC是解决分布式事务的一种思想,不同系统在实现上会有一定差别。它的思想有有一个协调者
,决定事务执行的两端是提交事务/还是回滚事务。
那么RocketMq具体是怎么实现的呢?
简单地说RocketMq是通过:
保证两端业务的事务一致性的。
首先消费端的ack机制好理解,RocketMq可以保证已经持久化的消息至少被成功消费一次
。也就是说消费端的事务一致性是可以保证的。即使消费端宕机了,那么重启之后再次消费依然可以保证一致。
生产端是RocketMq事务消息的核心。当生产端需要执行事务时,首先会向Mq发送一个半消息。
为什么叫半消息呢?首先肯定不是消息的一半,它指的是发送的消息是无法被消费者直接消费的,消息是发送到了系统内置的另外一个Topic中RMQ_SYS_TRANS_HALF_TOPIC
。
1、事务发起方向Broker发送一个半消息,半消息会包含唯一事务Id,topic、queueId、业务消息内容、事务标志。broker在收到事务标志TRANSACTION_PREPARED_TYPE后会备份原有消息的topic与queueId,便于事务成功提交后正常投递。
2、半消息发送成功后
3、执行本地事务逻辑回调,根据执行结果返回 COMMIT(提交),ROLLBACK(回滚),UNKNOW(未知)
4、Broker根据本地事务执行结果选择提交消息、回滚消息。
5、如果是提交消息,Broker会把原来对应tran_id的半消息查询出来,并恢复topic与queueId,投递到对应topic中。如果是回滚,则丢弃消息。
6、消费端接收到消息之后,执行本地事务。
7、如果成功消费,调整consumeOffset。如果消费失败,通过ack机制进行消息重发。
在上述第3步,如果执行本地事务后,结果没有正常回传到Broker(网络异常、服务宕机),那么Broker就不知道是需要提交还是回滚。此时broker会选择事务消息对应的生产者组
中任意一台生产者服务,定时发起事务状态回查,查询的依据是发消息时自动生成的唯一事务Id
,根据本地事务状态来决定是提交还是回滚。
这里借助RocketMq中源码中的示例org.apache.rocketmq.example.transaction.TransactionProducer,对事务消息做一个使用说明。
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 事务监听器 TransactionListener transactionListener = new TransactionListenerImpl(); // 生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_group1"); // 事务状态回查用的线程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); // 发送事务消息 for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest1234", "TagA", "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s,半消息发送结果%s%n",new Date(System.currentTimeMillis()),sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } // 让主线程等待,以便执行事务状态回查 for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
发送事务消息与普通消息区别不大,主要就是设置了TransactionListener与ExecutorService(事务回查线程池),以及在发送消息时改为调用sendMessageInTransaction。
这里的关键在于TransactionListener的实现类。其中封装了执行本地事务
与消息状态回查
的逻辑。
public interface TransactionListener {
// 执行本地事务,arg由sendMessageInTransaction传入
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
// 回查事务状态
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
我们看下具体实现:
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(1); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 模拟本地事务执行,并将结果存入localTransMap中 System.out.printf("%s执行本地事务msg:%s%n,arg:%s\n", new Date(System.currentTimeMillis()),msg.toString(), arg); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); // 返回Unknown,触发事务回查 return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据msg.getTransactionId()回查事务状态,来决定是提交还是回滚 System.out.printf("%s,事务结果回查%s%n\n", new Date(System.currentTimeMillis()),msg.toString()); Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
日志结果:
Mon Sep 06 22:50:15 CST 2021执行本地事务msg:Message{topic='TopicTest1234', flag=0, properties={KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F000001CAD818B4AAC21EA64D4D0000, WAIT=true, PGROUP=transaction_group1, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='7F000001CAD818B4AAC21EA64D4D0000'}
,arg:null
Mon Sep 06 22:50:15 CST 2021,半消息发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001CAD818B4AAC21EA64D4D0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=broker-a, queueId=3], queueOffset=59]
Mon Sep 06 22:51:02 CST 2021,事务结果回查MessageExt [brokerName=null, queueId=3, storeSize=317, queueOffset=60, sysFlag=0, bornTimestamp=1630939815246, bornHost=/127.0.0.1:65407, storeTimestamp=1630939815262, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F00000000001B8B68, commitLogOffset=1805160, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest1234', flag=0, properties={REAL_TOPIC=TopicTest1234, TRANSACTION_CHECK_TIMES=1, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F000001CAD818B4AAC21EA64D4D0000, CLUSTER=DefaultCluster, PGROUP=transaction_group1, WAIT=false, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='7F000001CAD818B4AAC21EA64D4D0000'}]
从日志中知道,我们发送的MsgId为:7F00000100002A9F00000000001B8B68,Topic: TopicTest1234
进入RocketMq监控控制台:
该测试的场景是第一次执行本地事务,返回Unknown,事务状态第一次回查后,返回Commit。
所以我们可以看到22:50:15的半消息,没有问题,这是我们手动发送的。
那22:51:02的消息是怎么回事呢?这个时间刚好与Commit到TopicTest1234时间一致。
其实这是事务状态回查的一个机制。为了保证回查消息的推进,在每次事务回查发起前,会先往RMQ_SYS_TRANS_HALF_TOPIC
主题里面再次发送回查消息。这样可以保证,如果回查失败,下次回查任务可以通过前面发送的回查消息继续。
可以参考:Rocket技术内幕作者的一篇博客
我们知道,RocketMq和Kafka都支持事务,那么这两者实现的事务有什么区别呢?
简单地讲:
@Transactional
public Map createOrder() {
Map result = new HashMap<>();
// 1、执行下订单业务流程,插入订单表
// 2、发送事务消息
return result;
}
@Transactional public Map createOrder() { Map result = new HashMap<>(); producer.initTransaction(); try{ // 开启事务 producer.beginTransaction(); // 发送多条消息 producer.send(msg1); producer.send(msg2); producer.send(msg3); // 提交事务 producer.commitTransaction(); }catch (KafkaException e) { producer.abortTransaction(); } return result; }
所以说,rocketMq的事务更符合我们平时理解的事务的概念。
1、RocketMq生产者组(producer group)的设定有什么用?
逻辑概念
,他们共同生产一批Topic。运维工具查询
一组生产者生产者组中任意一台
生产者查询事务状态。2、一个订单处理的场景,消费者订阅了订单topic,但总担心丢消息。于是为了防止丢消息,每天都会通过定时任务做一致性校验,这是最佳方案吗?
事务消息
,可以保证订单入库与消息发送的原子性,消息在生产端不会丢失。重试机制
保证消费者成功消费。Acl权限控制
,保障消息生产消费的安全性。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。