赞
踩
很多时候,消息的消费是不⽤保证顺序的,⽐如借助mq实现订单超时的处理。但有些时候,业务中可 能会存在多个消息需要顺序处理的情况,⽐如⽣成订单和扣减库存消息,那肯定是先执⾏⽣成订单的 操作,再执⾏扣减库存的操作。 那么这种情况下,是如何保证消息顺序消费的呢?
⾸先,为了效率,我们可以设置多个队列都来处理顺序执⾏的消息。另外,我们需要保证每组顺序消 费的消息发到同⼀个队列中,给这些消息设置⼀个统⼀的全局id即可。
其次,保证消息的顺序消费。就像上⾯所说,⼀个队列对应⼀个消费者即可,但是在项⽬的集群部署 下,这⼜该怎么处理呢?针对这种情况,我们可以设置队列的“单活模式”。
x-single-active-consumer:单活模式,表⽰是否最多只允许⼀个消费者消费,如果有多个 消费者同时绑定,则只会激活第⼀个,除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下 ⼀个消费者
假设现在我们有两个队列处理顺序消息(消息1-1和1-2属于⼀组需要顺序消费的消息,消息2-1和2-2属 于另⼀组需要顺序消费的消息),每个队列有两个消费者(模拟消费者集群)。
1. 队列的配置类
- package com.zst.springbootmq.sequence.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.HashMap;
- @Configuration
- public class SeqQueueConfiguration {
- /**
- * 创建两个队列,处理顺序消息
- */
- @Bean
- public Queue seqQueue1() {
- return creatQueue("q_seq1");
- }
- @Bean
- public Queue seqQueue2() {
- return creatQueue("q_seq2");
- }
- // 交换机
- @Bean
- public DirectExchange seqDirectExchange() {
- return new DirectExchange("direct_seq");
- }
- // 队列绑定交换机,执⾏路由key
- @Bean
- public Binding seqBinding1() {
- return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1"
- }
- @Bean
- public Binding seqBinding2() {
- return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2"
- }
- /**
- * 创建⼀个 单活模式的队列
- * @param name
- * @return queue
- */
- private Queue creatQueue(String name) {
- HashMap<String, Object> args = new HashMap<>();
- // x-single-active-consumer 单活模式 队列
- // 表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,
- // 除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
- args.put("x-single-active-consumer", true);
- return new Queue(name, true, false, false, args);
- }
- }
-
2. ⽣产者
- package com.zst.springbootmq.sequence.producer;
- import com.qfedu.springbootmq.sequence.message.MessageInfo;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- @Component
- public class ProducerSeq {
- @Resource
- private RabbitTemplate rabbitTemplate;
- /**
- * 根据id,将消息顺序发送到对应的队列
- * @param id 业务id
- * @param msg 业务信息
- */
- public void send(int id, String msg) {
- MessageInfo message = new MessageInfo(id, msg);
- rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1),
- }
- }
3. 消费者
- package com.zst.springbootmq.sequence.consumer;
- import com.qfedu.springbootmq.sequence.message.MessageInfo;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.util.Random;
- @Component
- @RabbitListener(queues = "q_seq1")
- public class Consumer11 {
- @RabbitHandler
- public void onMessage(MessageInfo message) {
- System.out.println("c11:" + message.getId() + ":" + message.getMsg());
- // 随机休眠
- long l = new Random(1000).nextLong();
- try {
- Thread.sleep(l);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
消费者2的代码实现:
- package com.zst.springbootmq.sequence.consumer;
- import com.qfedu.springbootmq.sequence.message.MessageInfo;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.util.Random;
- @Component
- @RabbitListener(queues = "q_seq1")
- public class Consumer12 {
- @RabbitHandler
- public void onMessage(MessageInfo message) {
- System.out.println("c12:" + message.getId() + ":" + message.getMsg());
- // 随机休眠
- long l = new Random(1000).nextLong();
- try {
- Thread.sleep(l);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
消费者3的代码实现:
- package com.zst.springbootmq.sequence.consumer;
- import com.zst.springbootmq.sequence.message.MessageInfo;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.util.Random;
- @Component
- @RabbitListener(queues = "q_seq2")
- public class Consumer21 {
- @RabbitHandler
- public void onMessage(MessageInfo message) {
- System.out.println("c21:" + message.getId() + ":" + message.getMsg());
- // 随机休眠
- long l = new Random(1000).nextLong();
- try {
- Thread.sleep(l);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
消费者4的代码实现:
- package com.zst.springbootmq.sequence.consumer;
- import com.zst.springbootmq.sequence.message.MessageInfo;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.util.Random;
- @Component
- @RabbitListener(queues = "q_seq2")
- public class Consumer22 {
- @RabbitHandler
- public void onMessage(MessageInfo message) {
- System.out.println("c22:" + message.getId() + ":" + message.getMsg());
- // 随机休眠
- long l = new Random(1000).nextLong();
- try {
- TimeUnit.MILLISECONDS.sleep(l);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
4. 测试
发送4个消息模拟顺序消费的消息,id为1和3的发送到⼀个队列,id为2和4的发送到另⼀个队列
- @Test
- public void testSeq() {
- for (int i = 1; i <= 4; i++) {
- producerSeq.send(i, "hello" + i);
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
输出结果:
从结果中可以看到,虽然⼀个队列配置了两个消费者,但是每对顺序消息只有⼀个消费者顺序消费。
另外,我们还可以看到队列中“SAC”,表⽰启⽤了单活模式,这样我们就实现了这个需求
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。