当前位置:   article > 正文

RabbitMQ如何保证顺序消费_rabbitmq保证顺序消费

rabbitmq保证顺序消费

⼀. 场景介绍:


很多时候,消息的消费是不⽤保证顺序的,⽐如借助mq实现订单超时的处理。但有些时候,业务中可
能会存在多个消息需要顺序处理的情况,⽐如⽣成订单和扣减库存消息,那肯定是先执⾏⽣成订单的
操作,再执⾏扣减库存的操作。
那么这种情况下,是如何保证消息顺序消费的呢?
⾸先,为了效率,我们可以设置多个队列都来处理顺序执⾏的消息。另外,我们需要保证每组顺序消
费的消息发到同⼀个队列中,给这些消息设置⼀个统⼀的全局id即可。
其次,保证消息的顺序消费。就像上⾯所说,⼀个队列对应⼀个消费者即可,但是在项⽬的集群部署
下,这⼜该怎么处理呢?针对这种情况,我们可以设置队列的“单活模式”。
x-single-active-consumer:单活模式,表⽰是否最多只允许⼀个消费者消费,如果有多个
消费者同时绑定,则只会激活第⼀个,除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下
⼀个消费者。

⼆. 模拟代码实现

假设现在我们有两个队列处理顺序消息(消息1-1和1-2属于⼀组需要顺序消费的消息,消息2-1和2-2属
于另⼀组需要顺序消费的消息),每个队列有两个消费者(模拟消费者集群)。
1. 队列的配置类
  1. 1 package com.qfedu.springbootmq.sequence.config;
  2. 2 import org.springframework.amqp.core.Binding;
  3. 3 import org.springframework.amqp.core.BindingBuilder;
  4. 4 import org.springframework.amqp.core.DirectExchange;
  5. 5 import org.springframework.amqp.core.Queue;
  6. 6 import org.springframework.context.annotation.Bean;
  7. 7 import org.springframework.context.annotation.Configuration;
  8. 8 import java.util.HashMap;
  9. 9
  10. 10 @Configuration
  11. 11 public class SeqQueueConfiguration {
  12. 12 /**
  13. 13 * 创建两个队列,处理顺序消息
  14. 14 */
  15. 15 @Bean
  16. 16 public Queue seqQueue1() {
  17. 17 return creatQueue("q_seq1");
  18. 18 }
  19. 19
  20. 20 @Bean
  21. 21 public Queue seqQueue2() {
  22. 22 return creatQueue("q_seq2");
  23. 23 }
  24. 24 // 交换机
  25. 25 @Bean
  26. 26 public DirectExchange seqDirectExchange() {
  27. 27 return new DirectExchange("direct_seq");
  28. 28 }
  29. 29
  30. 30 // 队列绑定交换机,执⾏路由key
  31. 31 @Bean
  32. 32 public Binding seqBinding1() {
  33. 33 return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1"
  34. 34 }
  35. 35
  36. 36 @Bean
  37. 37 public Binding seqBinding2() {
  38. 38 return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2"
  39. 39 }
  40. 40
  41. 41 /**
  42. 42 * 创建⼀个 单活模式的队列
  43. 43 * @param name
  44. 44 * @return queue
  45. 45 */
  46. 46 private Queue creatQueue(String name) {
  47. 47 HashMap<String, Object> args = new HashMap<>();
  48. 48 // x-single-active-consumer 单活模式 队列
  49. 49 // 表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,
  50. 50 // 除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
  51. 51 args.put("x-single-active-consumer", true);
  52. 52 return new Queue(name, true, false, false, args);
  53. 53 }
  54. 54 };
2. ⽣产者
  1. 1 package com.qfedu.springbootmq.sequence.producer;
  2. 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
  3. 3 import lombok.extern.slf4j.Slf4j;
  4. 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. 5 import org.springframework.stereotype.Component;
  6. 6 import javax.annotation.Resource;
  7. 7
  8. 8 @Component
  9. 9 public class ProducerSeq {
  10. 10 @Resource
  11. 11 private RabbitTemplate rabbitTemplate;
  12. 12
  13. 13 /**
  14. 14 * 根据id,将消息顺序发送到对应的队列
  15. 15 * @param id 业务id
  16. 16 * @param msg 业务信息
  17. 17 */
  18. 18 public void send(int id, String msg) {
  19. 19 MessageInfo message = new MessageInfo(id, msg);
  20. 20 rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1),
  21. 21 }
  22. 22 }
3.1  消费者1的代码实现:
  1. 1 package com.qfedu.springbootmq.sequence.consumer;
  2. 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
  3. 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. 5 import org.springframework.stereotype.Component;
  6. 6 import java.util.Random;
  7. 7
  8. 8 @Component
  9. 9 @RabbitListener(queues = "q_seq1")
  10. 10 public class Consumer11 {
  11. 11 @RabbitHandler
  12. 12 public void onMessage(MessageInfo message) {
  13. 13 System.out.println("c11:" + message.getId() + ":" + message.getMsg());
  14. 14 // 随机休眠
  15. 15 long l = new Random(1000).nextLong();
  16. 16 try {
  17. 17 Thread.sleep(l);
  18. 18 } catch (InterruptedException e) {
  19. 19 e.printStackTrace();
  20. 20 }
  21. 21 }
  22. 22 }
3.2  消费者2的代码实现:
  1. 1 package com.qfedu.springbootmq.sequence.consumer;
  2. 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
  3. 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. 5 import org.springframework.stereotype.Component;
  6. 6 import java.util.Random;
  7. 7
  8. 8 @Component
  9. 9 @RabbitListener(queues = "q_seq1")
  10. 10 public class Consumer12 {
  11. 11 @RabbitHandler
  12. 12 public void onMessage(MessageInfo message) {
  13. 13 System.out.println("c12:" + message.getId() + ":" + message.getMsg());
  14. 14 // 随机休眠
  15. 15 long l = new Random(1000).nextLong();
  16. 16 try {
  17. 17 Thread.sleep(l);
  18. 18 } catch (InterruptedException e) {
  19. 19 e.printStackTrace();
  20. 20 }
  21. 21 }
  22. 22 }
3.3  消费者3的代码实现:
  1. 1 package com.qfedu.springbootmq.sequence.consumer;
  2. 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
  3. 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. 5 import org.springframework.stereotype.Component;
  6. 6 import java.util.Random;
  7. 7
  8. 8 @Component
  9. 9 @RabbitListener(queues = "q_seq2")
  10. 10 public class Consumer21 {
  11. 11 @RabbitHandler
  12. 12 public void onMessage(MessageInfo message) {
  13. 13 System.out.println("c21:" + message.getId() + ":" + message.getMsg());
  14. 14 // 随机休眠
  15. 15 long l = new Random(1000).nextLong();
  16. 16 try {
  17. 17 Thread.sleep(l);
  18. 18 } catch (InterruptedException e) {
  19. 19 e.printStackTrace();
  20. 20 }
  21. 21 }
  22. 22 }

3.4 消费者4的代码实现:

  1. 1 package com.qfedu.springbootmq.sequence.consumer;
  2. 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
  3. 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. 5 import org.springframework.stereotype.Component;
  6. 6 import java.util.Random;
  7. 7
  8. 8 @Component
  9. 9 @RabbitListener(queues = "q_seq2")
  10. 10 public class Consumer22 {
  11. 11 @RabbitHandler
  12. 12 public void onMessage(MessageInfo message) {
  13. 13 System.out.println("c22:" + message.getId() + ":" + message.getMsg());
  14. 14 // 随机休眠
  15. 15 long l = new Random(1000).nextLong();
  16. 16 try {
  17. 17 TimeUnit.MILLISECONDS.sleep(l);
  18. 18 } catch (InterruptedException e) {
  19. 19 e.printStackTrace();
  20. 20 }
  21. 21 }
  22. 22 }
4. 测试
发送4个消息模拟顺序消费的消息,id为1和3的发送到⼀个队列,id为2和4的发送到另⼀个队列。
  1. 1 @Test
  2. 2 public void testSeq() {
  3. 3 for (int i = 1; i <= 4; i++) {
  4. 4 producerSeq.send(i, "hello" + i);
  5. 5 }
  6. 6 try {
  7. 7 Thread.sleep(2000);
  8. 8 } catch (InterruptedException e) {
  9. 9 e.printStackTrace();
  10. 10 }
  11. 11 }

从结果中可以看到,虽然⼀个队列配置了两个消费者,但是每对顺序消息只有⼀个消费者顺序消费。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/621033
推荐阅读
相关标签
  

闽ICP备14008679号