赞
踩
代码实现
1).生产者和消费者都要写配置文件
spring:
rabbitmq:
host: 192.168.4.216 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: username # 用户名
password: password # 密码
2).生产者发送信息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
3).消费者监听队列
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
应用场景
:当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
1).消息发送
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
2).消息接收
模拟多个消费者绑定同一个队列
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
3).测试结果
本次测试让两个线程睡眠的时间不同,模拟的就是,一个消费者消费信息的速度比另一个快
。
消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的
4)能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
在广播模式下,消息发送流程是这样的:
代码实现:
1).写一个配置类来声明队列和交换机
@Configuration public class NotifyConfig { //交换机 public static final String EXCHANGE_FANOUT = "fanout"; //队列名称 public static final String NOTIFY_QUEUE = "notify_queue"; public static final String REPLY_QUEUE = "reply_queue"; //声明交换机 @Bean(EXCHANGE_FANOUT) public FanoutExchange exchange_direct(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(EXCHANGE_FANOUT, true, false); } //声明队列 @Bean("SIMPLE_QUEUE") public Queue simple_queue(){ return QueueBuilder.durable("simple.queue").build(); } //声明队列 @Bean(NOTIFY_QUEUE) public Queue publish_queue(){ return QueueBuilder.durable(NOTIFY_QUEUE).build(); } //交换机和队列绑定 @Bean public Binding binding_publish_queue(@Qualifier(NOTIFY_QUEUE) Queue queue, @Qualifier(EXCHANGE_FANOUT) FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } }
2).消息发送
void convertAndSend(String exchange, String routingKey, Object object)
//参数信息故名思意,中间这个routingkey到Direct来详细讲讲
rabbitTemplate.convertAndSend(NotifyConfig.EXCHANGE_FANOUT, "",message);
3).消息接收
@RabbitListener(queues = NotifyConfig.NOTIFY_QUEUE)
public void receive(String message) {
System.out.println("数据类型为string :spring notify 消费者接收到消息:【" + message + "】");
}
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
RoutingKey
(路由key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息1).消费者绑定
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】"); }
注意:指定的key不一样
2).消息发送
rabbitTemplate.convertAndSend(exchangeName, "red", message);
Fanout和Direct的差别
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
1)、消息发送
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
2)、消息接收
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】"); }
3)、结果
" + msg + “】”);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “topic.queue2”),
exchange = @Exchange(name = “itcast.topic”, type = ExchangeTypes.TOPIC),
key = “#.news”
))
public void listenTopicQueue2(String msg){
System.out.println(“消费者接收到topic.queue2的消息:【” + msg + “】”);
}
3)、结果
两个队列都能收到消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。