赞
踩
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- host: 你的rabbitmq的ip
- port: 5672
- username: guest
- password: guest
访问接口:http://localhost:15672,账号密码都为guest
进入后左下角有Add queue添加队列,我已添加队列为MqTest1
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads() {
- String queue="MqTest1";
- String message="message1";
- rabbitTemplate.convertAndSend(queue,message);
- }
-
- }
此时可以看到队列有一个消息
- package com.rabbitmqdemoconsumer.rabbitmq;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitLeistener {
-
- @RabbitListener(queues = "MqTest1")
- public void listenSimpleQueueMessage(String msg){
- System.out.println("接收到的消息:"+msg);
- }
- }
此时控制台输出接收到的消息
可以提高消息处理速度,避免队列消息堆积
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads() {
- String queue="MqTest1";
- String message="message1";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(queue,message);
- }
- }
-
- }
此时队列有10条消息
- package com.rabbitmqdemoconsumer.rabbitmq;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitLeistener {
-
- @RabbitListener(queues = "MqTest1")
- public void listenSimpleQueueMessage1(String msg){
- System.out.println("consume1接收到的消息:"+msg);
- }
- @RabbitListener(queues = "MqTest1")
- public void listenSimpleQueueMessage2(String msg){
- System.out.println("consume2接收到的消息:"+msg);
- }
- }
- consume1接收到的消息:message1
- consume2接收到的消息:message1
- consume1接收到的消息:message1
- consume2接收到的消息:message1
- consume1接收到的消息:message1
- consume2接收到的消息:message1
- consume1接收到的消息:message1
- consume2接收到的消息:message1
- consume1接收到的消息:message1
- consume2接收到的消息:message1
但是此时有一个问题就是消息预取,比如队列有10条消息,两个消费者各自直接先预取5个消息,如果一个消费者接受消息的速度慢,一个快,就会导致一个消费者已经完成工作,另一个还在慢慢处理,会造成消息堆积消费者身上,要解决这个问题需要在yml文件配置相关配置
- rabbitmq:
- host: 43.140.244.236
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- listener:
- simple:
- prefetch: 1 #每次只能取一个,处理完才能取下一个消息
这样可以避免消息预取导致堆积
exchange是交换机,负责消息路由,但不存储消息,路由失败则消息丢失
- package com.rabbitmqdemoconsumer.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanountConfig {
- //交换机声明
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("FanountExchange");
- }
- //声明队列1
- @Bean
- public Queue Fanount_Qeueue1(){
- return new Queue("Fanount_Qeueue1");
- }
- //声明队列2
- @Bean
- public Queue Fanount_Qeueue2(){
- return new Queue("Fanount_Qeueue2");
- }
- //绑定交换机和队列
- @Bean
- public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
- }
- @Bean
- public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
- }
- }
可以看到声明的队列
已经声明的交换机(第一个)
绑定关系
首先发送10条消息,经过交换机转发到队列
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads2() {
- String exchange="FanountExchange";
- String message="message";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(exchange,"",message);
- }
- }
-
- }
此时可以看到两个队列各自有十条消息
- //监听交换机Fanount_Qeueue1
- @RabbitListener(queues = "Fanount_Qeueue1")
- public void listenFanountQeueue1(String msg){
- System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
- }
- //监听交换机Fanount_Qeueue2
- @RabbitListener(queues = "Fanount_Qeueue2")
- public void listenFanountQeueue2(String msg){
- System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
- }
控制台结果如下(共发送20条,每个队列10条)
Fanount_Qeueue1接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue1接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue2接收到的消息:message Fanount_Qeueue2接收到的消息:message
会将消息根据规则路由到指定的队列
- package com.rabbitmqdemoconsumer.rabbitmq;
-
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitLeistener {
-
- /**
- * 绑定交换机和队列,并为key赋值
- * @param msg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "DirectQueue1"),
- exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.out.println("listenDirectQueue1接收到的消息:"+msg);
- }
-
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "DirectQueue2"),
- exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("listenDirectQueue2接收到的消息:"+msg);
- }
- }
此时可以看到声明的队列
声明的交换机(第一个)
绑定关系
发送消息
-
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads2() {
- String exchange="DirectExchange";
- String message="HelloWorld";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(exchange,"blue",message);
- }
- }
-
- }
接收消息
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
发送消息
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads2() {
- String exchange="DirectExchange";
- String message="HelloWorld";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(exchange,"blue",message);
- }
- }
-
- }
接收消息
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
- listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
- listenDirectQueue1(red,blue)接收到的消息:HelloWorld
Queue与Exchange指定BindingKey可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
比如:
bindingkey: china.# ->中国的所有消息
bindingkey: #.weather ->所以国家的天气
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "TopicQueue1"),
- exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
- key = {"china.#"}
- ))
- public void listenTopicQueue1(String msg){
- System.out.println("listenTopicQueue1接收到的消息:"+msg);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "TopicQueue2"),
- exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
- key = {"#.news"}
- ))
- public void listenTopicQueue2(String msg){
- System.out.println("listenTopicQueue2接收到的消息:"+msg);
- }
队列
交换机(第四个)
绑定关系
- package com.rabbitmqdemo;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads2() {
- String exchange="TopicExchange";
- String message="HelloWorld";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(exchange,"china.news",message);
- }
- }
-
- }
接收消息
TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue1接收到的消息:HelloWorld TopicQueue2接收到的消息:HelloWorld
发送消息
- package com.rabbitmqdemo;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class RabbitMQDemoPublishApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads2() {
- String exchange="TopicExchange";
- String message="HelloWorld";
- for (int i=0;i<10;i++){
- rabbitTemplate.convertAndSend(exchange,"china.weather",message);
- }
- }
-
- }
接收消息
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
- TopicQueue1接收到的消息:HelloWorld
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。