赞
踩
工作队列模式:一个生产者,多个消费者(可以选择竞争模式或者公平模式)
可以不选择交换机,使用默认交换机,我这里使用topic交换机
package com.gitee.small.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { //绑定键 public final static String DOG = "topic.dog"; /** * Queue构造函数参数说明 * new Queue(SMS_QUEUE, true); * 1. 队列名 * 2. 是否持久化 true:持久化 false:不持久化 */ @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.DOG); } @Bean public TopicExchange exchange() { return new TopicExchange("topicExchange"); } /** * 将firstQueue和topicExchange绑定,而且绑定的键值为topic.dog * 这样只要是消息携带的路由键是topic.dog,才会分发到该队列 */ @Bean public Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG); } }
通过rabbitTemplate发送消息,topicExchange是指定交换机,topic.dog是上面交换机绑定的路由,不是队列名称
rabbitTemplate.convertAndSend("topicExchange","topic.dog","工作队列模式测试");
指定两个监听器消费当前队列,设置竞争模式,由于我在一个工程里写了两个监听器,所以不要重复调用channel.basicAck方法手动确认消息,直接采用默认的自动确认消息即可
package com.gitee.small.rabbitmq; import com.rabbitmq.client.Channel; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.TimeUnit; @Component @Slf4j public class RabbitReceiver { @SneakyThrows @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.dog"), exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC) )) public void process(String msg, Channel channel, Message message) { TimeUnit.SECONDS.sleep(1); channel.basicQos(1); log.info("dog1-收到消息:{}", msg); // // 手动确认消息,但是不可重复确认,因为本示例中有两个监听器,所以手动确认这行代码注释掉 // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } @SneakyThrows @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.dog"), exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC) )) public void process2(String msg, Channel channel, Message message) { channel.basicQos(1); TimeUnit.SECONDS.sleep(2); log.info("dog2-收到消息:{}", msg); // // 手动确认消息,但是不可重复确认,因为本示例中有两个监听器,所以手动确认这行代码注释掉 // channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
结果示例
2023-01-10 14:00:18.166 INFO 8676 --- [ntContainer#0-1] com.gitee.small.rabbitmq.RabbitReceiver : dog1-收到消息:工作队列模式测试0
2023-01-10 14:00:19.175 INFO 8676 --- [ntContainer#1-1] com.gitee.small.rabbitmq.RabbitReceiver : dog2-收到消息:工作队列模式测试1
2023-01-10 14:00:19.196 INFO 8676 --- [ntContainer#0-1] com.gitee.small.rabbitmq.RabbitReceiver : dog1-收到消息:工作队列模式测试2
2023-01-10 14:00:20.222 INFO 8676 --- [ntContainer#0-1] com.gitee.small.rabbitmq.RabbitReceiver : dog1-收到消息:工作队列模式测试4
2023-01-10 14:00:21.196 INFO 8676 --- [ntContainer#1-1] com.gitee.small.rabbitmq.RabbitReceiver : dog2-收到消息:工作队列模式测试3
2023-01-10 14:00:21.248 INFO 8676 --- [ntContainer#0-1] com.gitee.small.rabbitmq.RabbitReceiver : dog1-收到消息:工作队列模式测试6
2023-01-10 14:00:23.222 INFO 8676 --- [ntContainer#1-1] com.gitee.small.rabbitmq.RabbitReceiver : dog2-收到消息:工作队列模式测试5
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。