赞
踩
x-single-active-consumer:单活模式,表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
- 1 package com.qfedu.springbootmq.sequence.config;
- 2 import org.springframework.amqp.core.Binding;
- 3 import org.springframework.amqp.core.BindingBuilder;
- 4 import org.springframework.amqp.core.DirectExchange;
- 5 import org.springframework.amqp.core.Queue;
- 6 import org.springframework.context.annotation.Bean;
- 7 import org.springframework.context.annotation.Configuration;
- 8 import java.util.HashMap;
- 9
- 10 @Configuration
- 11 public class SeqQueueConfiguration {
- 12 /**
- 13 * 创建两个队列,处理顺序消息
- 14 */
- 15 @Bean
- 16 public Queue seqQueue1() {
- 17 return creatQueue("q_seq1");
- 18 }
- 19
- 20 @Bean
- 21 public Queue seqQueue2() {
- 22 return creatQueue("q_seq2");
- 23 }
- 24 // 交换机
- 25 @Bean
- 26 public DirectExchange seqDirectExchange() {
- 27 return new DirectExchange("direct_seq");
- 28 }
- 29
- 30 // 队列绑定交换机,执⾏路由key
- 31 @Bean
- 32 public Binding seqBinding1() {
- 33 return BindingBuilder.bind(seqQueue1()).to(seqDirectExchange()).with("1"
- 34 }
- 35
- 36 @Bean
- 37 public Binding seqBinding2() {
- 38 return BindingBuilder.bind(seqQueue2()).to(seqDirectExchange()).with("2"
- 39 }
- 40
- 41 /**
- 42 * 创建⼀个 单活模式的队列
- 43 * @param name
- 44 * @return queue
- 45 */
- 46 private Queue creatQueue(String name) {
- 47 HashMap<String, Object> args = new HashMap<>();
- 48 // x-single-active-consumer 单活模式 队列
- 49 // 表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,
- 50 // 除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
- 51 args.put("x-single-active-consumer", true);
- 52 return new Queue(name, true, false, false, args);
- 53 }
- 54 };
- 1 package com.qfedu.springbootmq.sequence.producer;
- 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
- 3 import lombok.extern.slf4j.Slf4j;
- 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
- 5 import org.springframework.stereotype.Component;
- 6 import javax.annotation.Resource;
- 7
- 8 @Component
- 9 public class ProducerSeq {
- 10 @Resource
- 11 private RabbitTemplate rabbitTemplate;
- 12
- 13 /**
- 14 * 根据id,将消息顺序发送到对应的队列
- 15 * @param id 业务id
- 16 * @param msg 业务信息
- 17 */
- 18 public void send(int id, String msg) {
- 19 MessageInfo message = new MessageInfo(id, msg);
- 20 rabbitTemplate.convertAndSend("direct_seq", String.valueOf(id % 2 + 1),
- 21 }
- 22 }
- 1 package com.qfedu.springbootmq.sequence.consumer;
- 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
- 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
- 5 import org.springframework.stereotype.Component;
- 6 import java.util.Random;
- 7
- 8 @Component
- 9 @RabbitListener(queues = "q_seq1")
- 10 public class Consumer11 {
- 11 @RabbitHandler
- 12 public void onMessage(MessageInfo message) {
- 13 System.out.println("c11:" + message.getId() + ":" + message.getMsg());
- 14 // 随机休眠
- 15 long l = new Random(1000).nextLong();
- 16 try {
- 17 Thread.sleep(l);
- 18 } catch (InterruptedException e) {
- 19 e.printStackTrace();
- 20 }
- 21 }
- 22 }
- 1 package com.qfedu.springbootmq.sequence.consumer;
- 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
- 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
- 5 import org.springframework.stereotype.Component;
- 6 import java.util.Random;
- 7
- 8 @Component
- 9 @RabbitListener(queues = "q_seq1")
- 10 public class Consumer12 {
- 11 @RabbitHandler
- 12 public void onMessage(MessageInfo message) {
- 13 System.out.println("c12:" + message.getId() + ":" + message.getMsg());
- 14 // 随机休眠
- 15 long l = new Random(1000).nextLong();
- 16 try {
- 17 Thread.sleep(l);
- 18 } catch (InterruptedException e) {
- 19 e.printStackTrace();
- 20 }
- 21 }
- 22 }
- 1 package com.qfedu.springbootmq.sequence.consumer;
- 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
- 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
- 5 import org.springframework.stereotype.Component;
- 6 import java.util.Random;
- 7
- 8 @Component
- 9 @RabbitListener(queues = "q_seq2")
- 10 public class Consumer21 {
- 11 @RabbitHandler
- 12 public void onMessage(MessageInfo message) {
- 13 System.out.println("c21:" + message.getId() + ":" + message.getMsg());
- 14 // 随机休眠
- 15 long l = new Random(1000).nextLong();
- 16 try {
- 17 Thread.sleep(l);
- 18 } catch (InterruptedException e) {
- 19 e.printStackTrace();
- 20 }
- 21 }
- 22 }
3.4 消费者4的代码实现:
- 1 package com.qfedu.springbootmq.sequence.consumer;
- 2 import com.qfedu.springbootmq.sequence.message.MessageInfo;
- 3 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- 4 import org.springframework.amqp.rabbit.annotation.RabbitListener;
- 5 import org.springframework.stereotype.Component;
- 6 import java.util.Random;
- 7
- 8 @Component
- 9 @RabbitListener(queues = "q_seq2")
- 10 public class Consumer22 {
- 11 @RabbitHandler
- 12 public void onMessage(MessageInfo message) {
- 13 System.out.println("c22:" + message.getId() + ":" + message.getMsg());
- 14 // 随机休眠
- 15 long l = new Random(1000).nextLong();
- 16 try {
- 17 TimeUnit.MILLISECONDS.sleep(l);
- 18 } catch (InterruptedException e) {
- 19 e.printStackTrace();
- 20 }
- 21 }
- 22 }
- 1 @Test
- 2 public void testSeq() {
- 3 for (int i = 1; i <= 4; i++) {
- 4 producerSeq.send(i, "hello" + i);
- 5 }
- 6 try {
- 7 Thread.sleep(2000);
- 8 } catch (InterruptedException e) {
- 9 e.printStackTrace();
- 10 }
- 11 }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。