当前位置:   article > 正文

RabbitMQ_五种模式(整合SpringBoot)_rabbitmq boot

rabbitmq boot

1.Simple("Hello World")

构成:生产者、消费者、消息队列 

配置类

构造函数参数:name durable exclusive autoDelete

仅创建队列,不创建交换机,也不进行队列和交换机的绑定

注:配置类置于生产者端或消费者端无硬性要求,但置于消费者端更有利于测试代码,原因如下:

若先启动生产者端发送消息,再启动消费者端接收消息,则最先连接到RabbitMQ服务器的消费者将接收所有的消息,所以通常而言需要先启动消费者端;而先启动消费者端时,若将配置类置于生产者端,则可能会出现消费者端中使用,但尚未被配置到RabbitMQ的配置(若配置类在生产者端的话),所以将配置类置于消费者端更有利于测试代码,另外,在项目启动时就进行配置显然是很好的选择

  1. @Configuration
  2. public class HelloWorldConfiguration {
  3. // 创建消息队列
  4. // 不需要交换机(使用默认交换机direct)
  5. @Bean
  6. public Queue helloWorldQueue(){
  7. return new Queue("hello_world_queue", true, false, false);
  8. }
  9. }

生产者

convertAndSend方法参数:exchange routingKey message

默认交换机为direct交换机,将routingKey设置为队列名可将消息直接发送至该队列

  1. @Service
  2. public class SayHelloWorldService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void say(){
  6. rabbitTemplate.convertAndSend("", "hello_world_queue", "hello, world");
  7. System.out.println("消息发送成功");
  8. }
  9. }

消费者

@RabbitListener 监听队列

@RabbitHandler 消息处理

  1. @Service
  2. @RabbitListener(queues = "hello_world_queue")
  3. public class HelloWorldService {
  4. @RabbitHandler
  5. public void message(String message){
  6. System.out.println("消息: " + message);
  7. }
  8. }

测试

  1. @Autowired
  2. SayHelloWorldService sayHelloWorldService;
  3. @Test
  4. void helloWorld(){
  5. sayHelloWorldService.say();
  6. }

 

2.Work Queues(工作模式)

构成:生产者、消费者、消息队列(较Simple模式而言,存在多个消费者)

配置类

  1. @Configuration
  2. public class WorkQueuesConfiguration {
  3. // 创建消息队列
  4. // 不需要交换机(使用默认交换机direct)
  5. @Bean
  6. public Queue workQueue(){
  7. return new Queue("work_queue", true, false, false);
  8. }
  9. }

生产者

  1. @Service
  2. public class WorkQueuesService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void workQueues(String message){
  6. rabbitTemplate.convertAndSend("", "work_queue", message);
  7. System.out.println("消息发送完成");
  8. }
  9. }

消费者 

  1. @Service
  2. @RabbitListener(queues = "work_queue")
  3. public class Worker01Service {
  4. @RabbitHandler
  5. public void worker01(String message){
  6. System.out.println("Worker01收到消息: " + message);
  7. }
  8. }
  1. @Service
  2. @RabbitListener(queues = "work_queue")
  3. public class Worker02Service {
  4. @RabbitHandler
  5. public void worker01(String message){
  6. System.out.println("Worker02收到消息: " + message);
  7. }
  8. }
  1. @Service
  2. @RabbitListener(queues = "work_queue")
  3. public class Worker03Service {
  4. @RabbitHandler
  5. public void worker03(String message){
  6. System.out.println("Worker03收到消息: " + message);
  7. }
  8. }

测试 

可以看到消息被平分给了三个消费者

 

3.Publish/Subscribe(发布/订阅模式)

构成:生产者、消费者、消息队列、fanout交换机

相较于上述模式,区别在于具有自定义的交换机,类型为fanout

配置类

消息队列与交换机进行绑定,当有生产者向交换机投递消息时,交换机将会把消息转发至所有与其绑定的消息队列

  1. @Configuration
  2. public class FanoutConfiguration {
  3. //订单交换机
  4. @Bean
  5. public FanoutExchange fanoutExchange(){
  6. return new FanoutExchange("fanout_exchange_order", true, false);
  7. }
  8. //消息通知队列(短信与邮件)
  9. @Bean
  10. public Queue fanoutSMSQueue(){
  11. return new Queue("sms_queue", true, false, false);
  12. }
  13. @Bean
  14. public Queue fanoutEmailQueue(){
  15. return new Queue("email_queue", true, false, false);
  16. }
  17. //绑定交换机与队列
  18. @Bean
  19. public Binding fanoutSMSBinding(){
  20. return BindingBuilder.bind(fanoutSMSQueue()).to(fanoutExchange());
  21. }
  22. @Bean
  23. public Binding fanoutEmailBinding(){
  24. return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
  25. }
  26. }

生产者

此处routingKey为空(fanout交换机无需routingKey)

  1. @Service
  2. public class FanoutOrderService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void makeOrder(String userID, String producerID, int num){
  6. // 1.根据需求查询仓库 判断是否能满足需求
  7. // 2.若能满足则生成订单
  8. String orderID = UUID.randomUUID().toString();
  9. System.out.println("成功生成订单");
  10. // 3.通过RabbitMQ发送消息
  11. String exchangeName = "fanout_exchange_order";
  12. String routingKey = "";
  13. rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID);
  14. System.out.println("订单发送成功");
  15. }
  16. }

消费者

  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "email_queue")
  4. public class EmailMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveEmailMessage(String message){
  8. System.out.println("接收到Email消息: " + message);
  9. }
  10. }
  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "sms_queue")
  4. public class SMSMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveSMSMessage(String message){
  8. System.out.println("接收到SMS消息: " + message);
  9. }
  10. }

测试 

  1. @Autowired
  2. FanoutOrderService fanoutOrderService;
  3. @Test
  4. void fanoutOrder() {
  5. fanoutOrderService.makeOrder("1", "1", 1);
  6. }

4.Routing(路由模式)

构成:生产者、消费者、消息队列、direct交换机

相较于Publish/Subscribe模式,区别在于交换机类型,direct交换机支持以routingKey标识并分类消息队列

配置类

绑定消息队列与交换机时,还需要绑定routingKey,交换机将根据routingKey标识并分类消息队列;发送消息时,需要指明routingKey,交换机会根据routingKey将消息转发至对应的消息队列处

  1. @Configuration
  2. public class DirectConfiguration {
  3. //订单交换机
  4. @Bean
  5. public DirectExchange directExchange(){
  6. return new DirectExchange("direct_exchange_order", true, false);
  7. }
  8. //消息通知队列(短信与邮件)
  9. @Bean
  10. public Queue directSMSQueue(){
  11. return new Queue("sms_queue", true, false, false);
  12. }
  13. @Bean
  14. public Queue directEmailQueue(){
  15. return new Queue("email_queue", true, false, false);
  16. }
  17. //绑定交换机与队列
  18. @Bean
  19. public Binding directSMSBinding(){
  20. return BindingBuilder.bind(directSMSQueue()).to(directExchange()).with("sms");
  21. }
  22. @Bean
  23. public Binding directEmailBinding(){
  24. return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
  25. }
  26. }

生产者

此处需要指明routingKey

  1. @Service
  2. public class DirectOrderService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void makeOrder(String userID, String producerID, int num){
  6. // 1.根据需求查询仓库 判断是否能满足需求
  7. // 2.若能满足则生成订单
  8. String orderID = UUID.randomUUID().toString();
  9. System.out.println("成功生成订单");
  10. // 3.通过RabbitMQ发送消息
  11. String exchangeName = "direct_exchange_order";
  12. String routingKey01 = "sms";
  13. String routingKey02 = "email";
  14. rabbitTemplate.convertAndSend(exchangeName, routingKey01, orderID + " sms");
  15. rabbitTemplate.convertAndSend(exchangeName, routingKey02, orderID + " email");
  16. System.out.println("订单发送成功");
  17. }
  18. }

 消费者

  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "email_queue")
  4. public class EmailMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveEmailMessage(String message){
  8. System.out.println("接收到Email消息: " + message);
  9. }
  10. }
  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "sms_queue")
  4. public class SMSMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveSMSMessage(String message){
  8. System.out.println("接收到SMS消息: " + message);
  9. }
  10. }

测试

  1. @Autowired
  2. DirectOrderService directOrderService;
  3. @Test
  4. void directOrder() {
  5. directOrderService.makeOrder("1", "1", 1);
  6. }

5.Topic(主题模式)

构成:生产者、消费者、消息队列、topic交换机

相较于Routing模式,区别在于交换机类型,topic交换机可模糊匹配routingKey

关于模糊匹配规则

* 表示仅一级 #表示零级或多级(以.分级 级的内容可以为空)
举例:队列1 2的routing key分别为 *.number.* 与 #.number.#
   x.number.y 将匹配队列1 2
   .number. 将匹配队列2
   x.number. 将匹配队列2
   .number.y 将匹配队列2

配置类

此处routingKey以模糊匹配规则定义

  1. @Configuration
  2. public class TopicConfiguration {
  3. //订单交换机
  4. @Bean
  5. public TopicExchange topicExchange(){
  6. return new TopicExchange("topic_exchange_order", true, false);
  7. }
  8. //消息通知队列(短信与邮件)
  9. @Bean
  10. public Queue topicSMSQueue(){
  11. return new Queue("sms_queue", true, false, false);
  12. }
  13. @Bean
  14. public Queue topicEmailQueue(){
  15. return new Queue("email_queue", true, false, false);
  16. }
  17. //绑定交换机与队列
  18. @Bean
  19. public Binding topicSMSBinding(){
  20. return BindingBuilder.bind(topicSMSQueue()).to(topicExchange()).with("*.sms.*");
  21. }
  22. @Bean
  23. public Binding topicEmailBinding(){
  24. return BindingBuilder.bind(topicEmailQueue()).to(topicExchange()).with("#.email.#");
  25. }
  26. }

生产者

  1. @Service
  2. public class TopicOrderService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void makeOrder(String userID, String producerID, int num){
  6. // 1.根据需求查询仓库 判断是否能满足需求
  7. // 2.若能满足则生成订单
  8. String orderID = UUID.randomUUID().toString();
  9. System.out.println("成功生成订单");
  10. // 3.通过RabbitMQ发送消息
  11. String exchangeName = "topic_exchange_order";
  12. String routingKey01 = "xxx.sms.yyy";
  13. String routingKey02 = ".email.";
  14. rabbitTemplate.convertAndSend(exchangeName, routingKey01, orderID + " sms");
  15. rabbitTemplate.convertAndSend(exchangeName, routingKey02, orderID + " email");
  16. System.out.println("订单发送成功");
  17. }
  18. }

 消费者

  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "email_queue")
  4. public class EmailMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveEmailMessage(String message){
  8. System.out.println("接收到Email消息: " + message);
  9. }
  10. }
  1. @Service
  2. // @RabbitListener 监听消息队列
  3. @RabbitListener(queues = "sms_queue")
  4. public class SMSMessageService {
  5. // @RabbitHandler 消息处理(接收消息)
  6. @RabbitHandler
  7. public void receiveSMSMessage(String message){
  8. System.out.println("接收到SMS消息: " + message);
  9. }
  10. }

测试

可以看到,模糊匹配成功

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/582171
推荐阅读
相关标签
  

闽ICP备14008679号