当前位置:   article > 正文

消息队列——spring和springboot整合rabbitmq_rabbitmq消费者配置

rabbitmq消费者配置

目录

spring整合rabbitmq——生产者

rabbitmq配置文件信息

倒入生产者工程的相关代码

简单工作模式

spring整合rabbitmq——消费者

spring整合rabbitmq——配置详解

SpringBoot整合RabbitMQ——生产者

 SpringBoot整合RabbitMQ——消费者


 

spring整合rabbitmq——生产者

使用原生amqp来写应该已经没有这样的公司了

创建两个工程,一个生产者一个消费者,分别倒入如下依赖

  1. <dependencies>
  2. <!--上下文-->
  3. <dependency>
  4. <groupId>org.springframework</groupId>
  5. <artifactId>spring-context</artifactId>
  6. <version>5.1.7.RELEASE</version>
  7. </dependency>
  8. <!--spring整合amqp-->
  9. <dependency>
  10. <groupId>org.springframework.amqp</groupId>
  11. <artifactId>spring-rabbit</artifactId>
  12. <version>2.1.8.RELEASE</version>
  13. </dependency>
  14. <!--单元测试-->
  15. <dependency>
  16. <groupId>junit</groupId>
  17. <artifactId>junit</artifactId>
  18. <version>4.12</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework</groupId>
  22. <artifactId>spring-test</artifactId>
  23. <version>5.1.7.RELEASE</version>
  24. </dependency>
  25. </dependencies>
  26. <build>
  27. <plugins>
  28. <!--编译插件-->
  29. <plugin>
  30. <groupId>org.apache.maven.plugins</groupId>
  31. <artifactId>maven-compiler-plugin</artifactId>
  32. <version>3.8.0</version>
  33. <configuration>
  34. <source>1.8</source>
  35. <target>1.8</target>
  36. </configuration>
  37. </plugin>
  38. </plugins>
  39. </build>

rabbitmq配置文件信息

rabbitmq.properties文件如下

  1. rabbitmq.host=172.16.98.133
  2. rabbitmq.port=5672
  3. rabbitmq.username=heima
  4. rabbitmq.password=heima
  5. rabbitmq.virtual-host=/itcast

倒入生产者工程的相关代码

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/context
  9. https://www.springframework.org/schema/context/spring-context.xsd
  10. http://www.springframework.org/schema/rabbit
  11. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  12. <!--加载配置文件-->
  13. <context:property-placeholder location="classpath:/rabbitmq.properties"/>
  14. <!-- 定义rabbitmq connectionFactory -->
  15. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  16. port="${rabbitmq.port}"
  17. username="${rabbitmq.username}"
  18. password="${rabbitmq.password}"
  19. virtual-host="${rabbitmq.virtual-host}"/>
  20. <!--定义管理交换机、队列-->
  21. <rabbit:admin connection-factory="connectionFactory"/>
  22. <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
  23. 默认交换机类型为direct,名字为:"",路由键为队列的名称
  24. -->
  25. <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
  26. <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
  27. <!--定义广播交换机中的持久化队列,不存在则自动创建-->
  28. <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
  29. <!--定义广播交换机中的持久化队列,不存在则自动创建-->
  30. <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
  31. <!--定义广播类型交换机;并绑定上述两个队列-->
  32. <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
  33. <rabbit:bindings>
  34. <rabbit:binding queue="spring_fanout_queue_1"/>
  35. <rabbit:binding queue="spring_fanout_queue_2"/>
  36. </rabbit:bindings>
  37. </rabbit:fanout-exchange>
  38. <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
  39. <!--定义广播交换机中的持久化队列,不存在则自动创建-->
  40. <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
  41. <!--定义广播交换机中的持久化队列,不存在则自动创建-->
  42. <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
  43. <!--定义广播交换机中的持久化队列,不存在则自动创建-->
  44. <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
  45. <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
  46. <rabbit:bindings>
  47. <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
  48. <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
  49. <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
  50. </rabbit:bindings>
  51. </rabbit:topic-exchange>
  52. <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  53. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  54. </beans>

 上面这个配置文件准备了三种工作模式需要的队列和交换机。

简单工作模式

在测试类中加载配置文件并发送消息

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
  3. public class ProducerTest {
  4. //1.注入RabbitTemplate
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @Test
  8. public void testHelloWorld(){
  9. //2.发送消息
  10. rabbitTemplate.convertAndSend("spring_queue","hello-yhy");
  11. }
  12. /**
  13. * 发送fanout
  14. */
  15. @Test
  16. public void testFaonut(){
  17. //2.发送消息
  18. rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
  19. }
  20. /**
  21. * 发送topic消息
  22. */
  23. @Test
  24. public void testTopic(){
  25. //2.发送消息
  26. rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic....");
  27. }
  28. }

运行上三个测试方法过后管理端如下,出现了新的队列和交换机和信息

   

spring整合rabbitmq——消费者

导入消费者的XML配置文件

消费者中还要创建对应的监听器的类,不然配置文件爆红

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/context
  9. https://www.springframework.org/schema/context/spring-context.xsd
  10. http://www.springframework.org/schema/rabbit
  11. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  12. <!--加载配置文件-->
  13. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  14. <!-- 定义rabbitmq connectionFactory -->
  15. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  16. port="${rabbitmq.port}"
  17. username="${rabbitmq.username}"
  18. password="${rabbitmq.password}"
  19. virtual-host="${rabbitmq.virtual-host}"/>
  20. <bean id="springQueueListener" class="com.yhy.rabbitmq.listener.SpringQueueListener"/>
  21. <!-- <bean id="fanoutListener1" class="com.yhy.rabbitmq.listener.FanoutListener1"/>-->
  22. <!-- <bean id="fanoutListener2" class="com.yhy.rabbitmq.listener.FanoutListener2"/>-->
  23. <!-- <bean id="topicListenerStar" class="com.yhy.rabbitmq.listener.TopicListenerStar"/>-->
  24. <!-- <bean id="topicListenerWell" class="com.yhy.rabbitmq.listener.TopicListenerWell"/>-->
  25. <!-- <bean id="topicListenerWell2" class="com.yhy.rabbitmq.listener.TopicListenerWell2"/>-->
  26. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
  27. <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
  28. <!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
  29. <!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
  30. <!-- <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>-->
  31. <!-- <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>-->
  32. <!-- <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
  33. </rabbit:listener-container>
  34. </beans>

然后创建一个简单工作模式需要的对应类

  1. public class SpringQueueListener implements MessageListener {
  2. @Override
  3. public void onMessage(Message message) {
  4. /**
  5. * 打印消息
  6. */
  7. System.out.println(new String(message.getBody()));
  8. }
  9. }

在测试类中弄个方法用来加载配置文件,配置文件一加载,上面的监听器就会自动执行的。

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
  3. public class ConsumerTest {
  4. @Test
  5. public void test1(){
  6. while(true){
  7. }
  8. }
  9. }

其余的都是一模一样的写法。

spring整合rabbitmq——配置详解

队列声明的参数 

广播类型的交换机和队列绑定时不需要指定路由key,direct和topic都要指定路由key.

SpringBoot整合RabbitMQ——生产者

引入如下依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. </dependency>

 再在resources目录下写一个配置文件类

  1. # 配置RabbitMQ的基本信息 ip 端口 username password ...
  2. spring:
  3. rabbitmq:
  4. host:
  5. post: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /

创建启动类

  1. package com.yhy;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class ProducerApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(ProducerApplication.class);
  8. }
  9. }

准备一个配置类

  1. package com.yhy.rabbit.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitConfig {
  8. public static final String EXCHANGE_NAME="boot_topic_exchange";
  9. public static final String QUEUE_NAME="boot_queue";
  10. //1.交换机
  11. @Bean("bootExchange")
  12. public Exchange bootExchange(){
  13. return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  14. }
  15. //2.Queue队列
  16. @Bean("bootQueue")
  17. public Queue bootQueue(){
  18. return QueueBuilder.durable(QUEUE_NAME).build();
  19. }
  20. //3.队列和交换机绑定关系,Binding
  21. /**
  22. * 1.知道哪个队列
  23. * 2.知道哪个交换机
  24. * 3.routing key
  25. */
  26. @Bean
  27. public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
  28. return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
  29. }
  30. }

在测试类中准备如下测试方法

  1. @SpringBootTest
  2. @RunWith(SpringRunner.class)
  3. public class ProducerTest {
  4. //1.注入RabbitTemplate
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @Test
  8. public void testSend(){
  9. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","boot mq hello");
  10. }
  11. }

运行后可以看见出现有新队列和消息

 SpringBoot整合RabbitMQ——消费者

 在性工程创建一个监听类如下,加上@Component注解之后就可以自动执行一次了

  1. @Component
  2. public class RabbitMQListener {
  3. @RabbitListener(queues="boot_queue")
  4. public void ListenerQueue(Message message){
  5. System.out.println(message);
  6. }
  7. }

输出如下,成功获取到上面生产者发出的消息

   

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/386643
推荐阅读
相关标签
  

闽ICP备14008679号