当前位置:   article > 正文

RabbitMQ——SpringBoot集成RabbitMQ_若依集成rabbitmq

若依集成rabbitmq

文章目录:

1.创建一个SpringBoot工程——消息发送者

1.创建一个SpringBoot工程——消息接收者

3.测试结果

3.1 direct

3.2 fanout

3.3 topic

3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列


1.创建一个SpringBoot工程——消息发送者

前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。

在核心配置文件中,配置RabbitMQ的相关连接信息。

  1. #配置RabbitMQ的相关连接信息
  2. spring.rabbitmq.host=192.168.40.130
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=root
  5. spring.rabbitmq.password=root

编写实现消息发送的接口和实现类。

接口中的三个方法分别对应 direct、fanout、topic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

  1. package com.szh.springboot.rabbitmq.service;
  2. /**
  3. *
  4. */
  5. public interface SendService {
  6. void sendMessage(String message);
  7. void sendFanout(String message);
  8. void sendTopic(String message);
  9. }
  1. package com.szh.springboot.rabbitmq.service.impl;
  2. import com.szh.springboot.rabbitmq.service.SendService;
  3. import org.springframework.amqp.core.AmqpTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. *
  8. */
  9. @Service("sendService")
  10. public class SendServiceImpl implements SendService {
  11. @Autowired
  12. private AmqpTemplate amqpTemplate;
  13. @Override
  14. public void sendMessage(String message) {
  15. /**
  16. * 发送消息
  17. * 参数1:交换机名称
  18. * 参数2:RoutingKey
  19. * 参数3:具体发送的消息内容
  20. */
  21. amqpTemplate.convertAndSend("springbootDirectExchange","springbootDirectRouting",message);
  22. }
  23. @Override
  24. public void sendFanout(String message) {
  25. amqpTemplate.convertAndSend("fanoutExchange","",message);
  26. }
  27. @Override
  28. public void sendTopic(String message) {
  29. amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
  30. }
  31. }

然后写一个关于三种类型交换机的配置类。

  1. package com.szh.springboot.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. *
  7. */
  8. @Configuration
  9. public class RabbitMQConfig {
  10. //配置一个Direct类型的交换机
  11. @Bean
  12. public DirectExchange directExchange() {
  13. return new DirectExchange("springbootDirectExchange");
  14. }
  15. //配置一个队列
  16. @Bean
  17. public Queue directQueue() {
  18. return new Queue("springbootDirectQueue");
  19. }
  20. /**
  21. * 配置一个队列和交换机的绑定
  22. * @param directQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
  23. * @param directExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
  24. * .with() 方法对应的RoutingKey
  25. * @return
  26. */
  27. @Bean
  28. public Binding directBinding(Queue directQueue,DirectExchange directExchange) {
  29. return BindingBuilder.bind(directQueue).to(directExchange).with("springbootDirectRouting");
  30. }
  31. //配置一个Fanout类型的交换机
  32. @Bean
  33. public FanoutExchange fanoutExchange() {
  34. return new FanoutExchange("fanoutExchange");
  35. }
  36. //配置一个Topic类型的交换机
  37. @Bean
  38. public TopicExchange topicExchange() {
  39. return new TopicExchange("topicExchange");
  40. }
  41. }

最后是SpringBoot项目的启动入口类。

这里首先是通过ApplicationContext获取到了Spring容器,然后从容器中拿到sendService这个对象,最后的三行代码分别对应的是测试这三种类型的交换机。

  1. package com.szh.springboot.rabbitmq;
  2. import com.szh.springboot.rabbitmq.service.SendService;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.ApplicationContext;
  6. @SpringBootApplication
  7. public class Application {
  8. public static void main(String[] args) {
  9. ApplicationContext context=SpringApplication.run(Application.class, args);
  10. SendService service= (SendService) context.getBean("sendService");
  11. service.sendMessage("SpringBoot集成RabbitMQ的测试数据");
  12. //service.sendFanout("SpringBoot集成RabbitMQ的测试数据");
  13. //service.sendTopic("SpringBoot集成RabbitMQ的测试数据");
  14. }
  15. }

1.创建一个SpringBoot工程——消息接收者

前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。

在核心配置文件中,配置RabbitMQ的相关连接信息。

  1. #配置RabbitMQ的相关连接信息
  2. spring.rabbitmq.host=192.168.40.130
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=root
  5. spring.rabbitmq.password=root

编写实现消息接收的接口和实现类。

接口中的这些方法分别对应 direct、fanout、topic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。

  1. package com.szh.sprringboot.rabbitmq.service;
  2. /**
  3. *
  4. */
  5. public interface ReceiveService {
  6. void receiveMessage();
  7. void directReceive(String message);
  8. void fanoutReceive01(String message);
  9. void fanoutReceive02(String message);
  10. void topicReceive01(String message);
  11. void topicReceive02(String message);
  12. void topicReceive03(String message);
  13. }
  1. package com.szh.sprringboot.rabbitmq.service.impl;
  2. import com.szh.sprringboot.rabbitmq.service.ReceiveService;
  3. import org.springframework.amqp.core.AmqpTemplate;
  4. import org.springframework.amqp.rabbit.annotation.Exchange;
  5. import org.springframework.amqp.rabbit.annotation.Queue;
  6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Service;
  10. /**
  11. *
  12. */
  13. @Service("receiveService")
  14. public class ReceiveServiceImpl implements ReceiveService {
  15. @Autowired
  16. private AmqpTemplate amqpTemplate;
  17. /**
  18. * receiveAndConvert()这个方法,每执行一次只能接收一次消息
  19. * 如果有消息进入,则不会自动接收消息(不建议使用)
  20. */
  21. @Override
  22. public void receiveMessage() {
  23. // String message= (String) amqpTemplate.receiveAndConvert("springbootDirectQueue");
  24. // System.out.println(message);
  25. }
  26. /**
  27. * @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
  28. * @param message
  29. * 该方法不需要手动调用,Spring会自动运行这个监听方法
  30. *
  31. * 注意:如果该监听方法正常结束,那么Spring会自动确认消息
  32. * 如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
  33. */
  34. @Override
  35. @RabbitListener(bindings = {
  36. @QueueBinding(
  37. value = @Queue(name = "springbootDirectQueue"),
  38. exchange = @Exchange(name = "springbootDirectExchange"),
  39. key = {"springbootDirectRouting"}
  40. )
  41. })
  42. public void directReceive(String message) {
  43. System.out.println(message);
  44. }
  45. @Override
  46. @RabbitListener(bindings = {
  47. @QueueBinding( //完成队列和交换机的绑定
  48. value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
  49. exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
  50. )
  51. })
  52. public void fanoutReceive01(String message) {
  53. System.out.println(message);
  54. }
  55. @Override
  56. @RabbitListener(bindings = {
  57. @QueueBinding( //完成队列和交换机的绑定
  58. value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
  59. exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
  60. )
  61. })
  62. public void fanoutReceive02(String message) {
  63. System.out.println(message);
  64. }
  65. @Override
  66. @RabbitListener(bindings = {
  67. @QueueBinding(
  68. value = @Queue("topic01"),
  69. exchange = @Exchange(name = "topicExchange",type = "topic"),
  70. key = {"aa"}
  71. )
  72. })
  73. public void topicReceive01(String message) {
  74. System.out.println("topic01 接收到的数据:" + message);
  75. }
  76. @Override
  77. @RabbitListener(bindings = {
  78. @QueueBinding(
  79. value = @Queue("topic02"),
  80. exchange = @Exchange(name = "topicExchange",type = "topic"),
  81. key = {"aa.*"}
  82. )
  83. })
  84. public void topicReceive02(String message) {
  85. System.out.println("topic02 接收到的数据:" + message);
  86. }
  87. @Override
  88. @RabbitListener(bindings = {
  89. @QueueBinding(
  90. value = @Queue("topic03"),
  91. exchange = @Exchange(name = "topicExchange",type = "topic"),
  92. key = {"aa.#"}
  93. )
  94. })
  95. public void topicReceive03(String message) {
  96. System.out.println("topic03 接收到的数据:" + message);
  97. }
  98. }

最后是SpringBoot项目的启动入口类。

  1. package com.szh.sprringboot.rabbitmq;
  2. import com.szh.sprringboot.rabbitmq.service.ReceiveService;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.ApplicationContext;
  6. @SpringBootApplication
  7. public class Application {
  8. public static void main(String[] args) {
  9. ApplicationContext context=SpringApplication.run(Application.class, args);
  10. ReceiveService service= (ReceiveService) context.getBean("receiveService");
  11. //service.receiveMessage();
  12. }
  13. }

3.测试结果

3.1 direct

先启动消息发送者工程,再启动消息接收者工程。

3.2 fanout

先启动消息接收者工程,再启动消息发送者工程。

因为这里fanout交换机中定义了两个消息队列,它是一对多、不需要绑定RoutingKey的,所以这些消息队列都会接收到消息数据。

3.3 topic

先启动消息接收者工程,再启动消息发送者工程。

因为这里topic交换机中定义了三个消息队列,它是一对多、需要绑定RoutingKey的,根据RoutingKey的不同会限制哪些消息队列能够接收到消息、哪些不能。当绑定的RoutingKey为aa时,只有BingKey为 aa、aa.# 这两个消息队列可以接收到(aa顾名思义、而aa.#是因为#表示0个或多个单词,aa.*接收不到是因为*仅能表示1个单词)。

3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列

这里的消息队列只有direct、topic的,至于为什么没有fanout的,是因为fanout类型的交换机在消息发送/接收服务停止之后,对应的交换机还在,但是消息队列会自动清除掉。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/617097
推荐阅读
相关标签
  

闽ICP备14008679号