当前位置:   article > 正文

使用Springboot配置生产者、消费者RabbitMQ?

使用Springboot配置生产者、消费者RabbitMQ?

生产者服务

1、引入依赖以及配置rabbitmq

此时我们通过使用springboot来快速搭建一个生产者服务

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

application.yml

  1. server:
  2.   port: 8080
  3. spring:
  4.   application:
  5.     name: producter
  6.   rabbitmq:
  7.     addresses: 192.168.118.100:5672
  8.     username: guest
  9.     password: guest
  10.     virtual-host: /
  11.     connection-timeout: 15000
2、定义队列、交换机以及绑定的routingkey

添加一个config包下添加文件

TopicRabbitConfig.java、MsgSender.java

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. //Topic类型交换机配置
  8. @Configuration
  9. public class TopicRabbitConfig {
  10. //配置队列
  11. @Bean
  12. public Queue queue1(){
  13. return new Queue("queue1");
  14. }
  15. @Bean
  16. public Queue queue2(){
  17. return new Queue("queue2");
  18. }
  19. //配置交换机
  20. @Bean
  21. public TopicExchange exchange(){
  22. return new TopicExchange("bootExchange");
  23. }
  24. //绑定队列到交换机并且执行routingkey,之后指定消费者即可通过指定队列来拿到信息
  25. @Bean
  26. public Binding bindingExchangeMessage1(Queue queue1,TopicExchange exchange){
  27. return BindingBuilder.bind(queue1).to(exchange).with("cat.red");
  28. }
  29. @Bean
  30. public Binding bindingExchangeMessage2(Queue queue2,TopicExchange exchange){
  31. return BindingBuilder.bind(queue2).to(exchange).with("*.red");
  32. }
  33. }
  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. //@Description 发送消息到指定交换机
  5. @Component
  6. public class MsgSender {
  7. @Autowired
  8. private AmqpTemplate amqpTemplate;
  9. private static String EXCHANGE_NAME = "bootExchange";
  10. //发送信息的routingkey=>"cat.red"
  11. public void send1(){
  12. String routingKey = "cat.red";
  13. String msg = "this is my message,routingkey is "+routingKey;
  14. //交换机名称、routingkey以及发送的信息
  15. amqpTemplate.convertAndSend(EXCHANGE_NAME,routingKey,msg);
  16. System.out.println("已成功发送信息:"+msg);
  17. }
  18. //发送信息的routingkey=>"dog.red"
  19. public void send2(){
  20. String routingKey = "dog.red";
  21. String msg = "this is my message,routingkey is "+routingKey;
  22. //交换机名称、routingkey以及发送的信息
  23. amqpTemplate.convertAndSend(EXCHANGE_NAME,routingKey,msg);
  24. System.out.println("已成功发送信息:"+msg);
  25. }
  26. }
3、创建一个测试类

调用两个方法:

  1. import com.changlu.productor.config.MsgSender;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. @SpringBootTest
  6. class ProductorApplicationTests {
  7. @Autowired
  8. private MsgSender msgSender;
  9. @Test
  10. void sendMsg1() {
  11. msgSender.send1();
  12. msgSender.send2();
  13. }
  14. }

当我用2.4.5的时候读不出来,推存使用高版本的spring boot版本

消费者服务

1、引入依赖以及配置rabbitmq
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

application.yml

  1. server:
  2. port: 8081 # 设置8081端口
  3. spring:
  4. application:
  5. name: consumer
  6. rabbitmq:
  7. addresses: 192.168.118.100:5672 # 同样rabbitmq运行端口默认为5672
  8. username: guest
  9. password: guest
  10. virtual-host: /
  11. connection-timeout: 15000
2、定义消费者并进行绑定监听

消费者1:绑定对应的queue1

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. //queue1对应routingkey=>cat.red
  5. @Component
  6. @RabbitListener(queues = "queue1")
  7. public class Consumer1 {
  8. @RabbitHandler
  9. public void process(String msg){
  10. System.out.println("queue1收到消息:"+msg);
  11. }
  12. }

消费者2:绑定对应的queue2

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. //queue2对应routingkey=>*.red
  5. @Component
  6. @RabbitListener(queues = "queue2")
  7. public class Consumer2 {
  8. @RabbitHandler
  9. public void process(String msg){
  10. System.out.println("queue2收到消息:"+msg);
  11. }
  12. }

测试

对于生产者服务或是消费者服务在该案例中任一一方启动都没有关系。本案例是topic类型的交换机,生产者服务先启动发送的消息会被暂时存储到指定队列中。

紧接着我们启动消费者服务,可以看到对应的queue1、queue2消费者分别收到了对应routingkey匹配的信息,此时我们可以来进行处理了! 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/402403?site
推荐阅读
相关标签
  

闽ICP备14008679号