赞
踩
业务场景中,我们可能会遇到消息队列顺序消费的情况,那么怎么保证顺序消费呢
一.首先 我们要保证消息投递的顺序性
1.RocketMQ中(RabbitMQ中同理)我们要确保我们需要顺序消费的消息进入同一个queue中
顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
举例:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,
根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- //如果是订单号是字符串,则进行hash,得到一个hash值
- Long id = (Long) arg;
- long index = id % mqs.size();
- return mqs.get((int)index);
- }
2.kafka中我们要确保我们的消息进入同一个partition中,消息投递的时候需要制定partition,代码中的1即是partition值
- producer.send(new ProducerRecord<>("mumu-topic",1,"zwt-key","zwt-value"), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null){
- System.out.println("发送状态:"+metadata.toString());
- }else{
- exception.printStackTrace();
- }
- }
- });
二.其次 我们要保证消费端消费的顺序性
1.RocketMQ中(RabbitMQ中同理)应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消费会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
2.kafka中消费端消费组只能有一个节点,这样就确保了消息的有序性
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。