当前位置:   article > 正文

RabbitMQ (二)--路由模式实战及常用概念_rabbitmq一个交换器可以绑定多个队列么

rabbitmq一个交换器可以绑定多个队列么

        上一章只是简单的介绍说明了一下rabbit mq的作用,以及spring boot 集成及简单的使用。里面有很多概念还没有来得及说明。本章节我们就来先了解一下rabbit的常用概念,并且对几种路由模式的使用方式

常用概念

        画图一直是硬伤,所以还是贴网图了,下面的图就是rabbit mq的工作模型。我们看完后面每一个含义,就大概了解rabbit的工作方式了

producer(消息生产者)

        往mq里面发送消息的程序,上一章节里面的ProducerController就是一个producer

consumer(消息消费者)

        消息的消费者程序,上一章的ConsumerDemo,这两个概念还是比较好理解

Connection(连接)

        生产者与消费者想要发送或者接收mq的消息,就首先需要连接到mq,connection就是这个连接,它的本质是一个tcp的长连接

Channel(信道)

        信道是在Connection上建立的虚拟连接,从图中可以看出,一个连接中有多个信道。rabbit   mq中的大部分操作都由信道完成。信道也是一个多线程的操作,主要是为了提高效率,毕竟一个连接所能支撑的操作肯定有限

Queue(队列)

        前面也提到过,就是一个先进先出的数据结构,生产者发送到队列中,消费者消费

Exchange(交换机)

        这个概念就相对比较重要了,可以看到生产者发送消息后的第一站就是交换机,负责根据不同的分发规则将消息分发到不同的队列上去(需要绑定(binding)队列)。消费者订阅了相关队列就可以接收到消息。那么它的分发规则就是我们接下来要重点讲的路由模式,一共四种:direct、fanout、topic、headers

小结

        了解了这几个概念再回头看图,基本就能明白 rabbit mq的工作方式:

1.生产者生产消息,通过Connection发送到mq中(也可以称为一个Broker)

2.交换机根据规则和绑定的队列进行分发到对应 queue(队列)中

3.消费者通过订阅对应queue,从连接的channel(信道)中接收到消息

4.ack确认(这个没有在图中体现,主要是做消息确认的,消费端告诉mq自己已经接收到了)

路由模式(交换机模式)

        当你在网上搜索rabbit 模式得时候,你会看到有的说四种,五种,六种得都有。我好难。。。。其实也不能怪他们,只是没有说清楚维度,下面就是为什么看到说四五六种都有的原因,希望大家看完能明白rabbit mq的模式划分

rabbitmq一共有六种消息模型,分别是:

        1.simple 简单模式

        2.work工作模式(资源的竞争)

        3.publish/subscribe发布订阅(共享资源)

        4.routing路由模式

        5.topic 主题模式(路由模式的一种)

        6.RPC (它并不是mq,所以很多人就排除掉它,认为只有五种,也没有问题)

        那么四种模式又是哪儿来的呢?

交换机的四种模式(direct、fanout、topic、headers)

说明一下,交换机还有一个默认交换机,default,它本质就是一个空名称的直连交换机,就没有单独去说明了,了解一下就行了,因为也有说交换机模式有五种的(心累)

direct(直连模式)

        路由到Routingkey与BindingKey完全匹配的队列中。一个交换器可以与多个队列绑定,同时一个交换器与一个队列绑定的时候可以使用多个BindingKey来多次绑定。

如果一个交换器只绑定一个队列,那么可以将Routingkey和BindingKey看成一个东西。

如果一个交换器绑定多个队列,则会把消息路由到Routingkey与BindingKey完全匹配的队列中。

1.定义队列

  1. package com.rabbit.config;
  2. import com.rabbit.constants.RabbitConstants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName DirectExchangeConfig
  8. * @Description Direct 直连模式 声明队列,所有消息都通过队列转发
  9. * @Author andy
  10. * @Date 2022/7/16 16:35
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class DirectExchangeConfig {
  15. /**
  16. * 声明队列 info
  17. * @return
  18. */
  19. @Bean
  20. Queue testDirectQueue() {
  21. return new Queue(RabbitConstants.DIRECT_QUEUE);
  22. }
  23. /**
  24. * 声明队列 debug
  25. * @return
  26. */
  27. @Bean
  28. Queue testDirectQueueDebug() {
  29. return new Queue(RabbitConstants.DIRECT_QUEUE_DEBUG);
  30. }
  31. /**
  32. * 声明交换器
  33. */
  34. @Bean
  35. DirectExchange getDirectExchange(){
  36. return new DirectExchange(RabbitConstants.DIRECT_EXCHANGE);
  37. }
  38. /**
  39. * 绑定队列到交换器,由交换器进行发送,with中routing key 路由键
  40. */
  41. @Bean
  42. Binding bindingDirect(){
  43. return BindingBuilder.bind(testDirectQueue()).to(getDirectExchange()).with("info");
  44. }
  45. @Bean
  46. Binding bindingDirectDebug(){
  47. return BindingBuilder.bind(testDirectQueueDebug()).to(getDirectExchange()).with("debug");
  48. }
  49. }

2.发送消息

  1. @GetMapping("direct-exchange")
  2. public void sendDirect(){
  3. String msg = "hello rabbit哈哈哈-direct";
  4. rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_EXCHANGE,"info", msg);
  5. rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_EXCHANGE,"debug", msg);
  6. }
  7. /**
  8. * 发送到指定队列中,因为默认路由其实也是direct,所以了解下
  9. @GetMapping("direct-exchange")
  10. public void sendDirect(){
  11. String msg = "hello rabbit哈哈哈-direct";
  12. rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_QUEUE, msg);
  13. }
  14. */

3.消费端监听

  1. /**
  2. * Direct直连模式
  3. * 监听消息队列,记得注解里面用常量,否则报错
  4. */
  5. @RabbitListener(queues = RabbitConstants.DIRECT_QUEUE)
  6. public void listenerOne(String msg, Channel channel, Message message){
  7. log.info("接收到直连模式的消息:"+msg);
  8. }
  9. @RabbitListener(queues = RabbitConstants.DIRECT_QUEUE_DEBUG)
  10. public void listenerTwo(String msg, Channel channel, Message message){
  11. log.info("接收到直连模式的消息:"+msg);
  12. }

fanout(广播模式)

        把发送到该交换器的消息发送到所有与该交换器绑定的队列中。不需要指定Routingkey和BindingKey。比如两个队列绑定了同一个交换器,那么2个队列都会同时收到消息。

        这个上一章已经写过了,就不再写一遍了,可以自行翻阅上一章的内容

topic(通配符)

        topic与direct类型的交换器类似,也是将消息路由到Routingkey与BindingKey匹配的队列中,但它不是完全匹配,而是模糊匹配。

 1.topic配置设置

  1. package com.rabbit.config;
  2. import com.rabbit.constants.RabbitConstants;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.core.TopicExchange;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. /**
  10. * @ClassName TopicExchangeConfig
  11. * @Description Topic模式 类似direct
  12. * @Author andy
  13. * @Date 2022/7/16 16:35
  14. * @Version 1.0
  15. */
  16. @Configuration
  17. public class TopicExchangeConfig {
  18. /**
  19. * 声明队列
  20. * @return
  21. */
  22. @Bean
  23. Queue testTopicQueue() {
  24. return new Queue(RabbitConstants.TOPIC_QUEUE_INFO);
  25. }
  26. /**
  27. * 声明队列
  28. * @return
  29. */
  30. @Bean
  31. Queue testTopicQueueDebug() {
  32. return new Queue(RabbitConstants.TOPIC_QUEUE_DEBUG);
  33. }
  34. /**
  35. * 声明交换器
  36. */
  37. @Bean
  38. TopicExchange getTopicExchange(){
  39. return new TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
  40. }
  41. /**
  42. * 绑定队列到交换器,由交换器进行发送,with中routing key 路由键
  43. */
  44. @Bean
  45. Binding bindingTopic(){
  46. //需要指定roytingKey,发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。
  47. return BindingBuilder.bind(testTopicQueue()).to(getTopicExchange()).with("*.info");
  48. }
  49. @Bean
  50. Binding bindingTopicDebug(){
  51. return BindingBuilder.bind(testTopicQueueDebug()).to(getTopicExchange()).with("topic.#");
  52. }
  53. }

 2.消息生产

  1. @GetMapping("topic-exchange")
  2. public void sendTopic(){
  3. //需要指定roytingKey,发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。
  4. String msg = "hello rabbit哈哈哈-topic";
  5. rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"*.info", "info--"+msg);
  6. rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"*.debug", "debug--"+msg);
  7. }

3.消费者监听

  1. /**
  2. * Topic模式
  3. * 监听消息队列,记得注解里面用常量,否则报错
  4. */
  5. @RabbitListener(queues = RabbitConstants.TOPIC_QUEUE_INFO)
  6. public void listenerTopicOne(String msg, Channel channel, Message message){
  7. log.info("接收到topic模式的消息:"+msg);
  8. }
  9. @RabbitListener(queues = RabbitConstants.TOPIC_QUEUE_DEBUG)
  10. public void listenerTopicTwo(String msg, Channel channel, Message message){
  11. log.info("接收到topic模式的消息:"+msg);
  12. }

headers(头)

        headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

1.configer

  1. package com.rabbit.config;
  2. import com.rabbit.constants.RabbitConstants;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.HeadersExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @ClassName HeadersExchangeConfig
  13. * @Description TODO
  14. * @Author andy
  15. * @Date 2022/7/20 17:59
  16. * @Version 1.0
  17. */
  18. @Configuration
  19. public class HeadersExchangeConfig {
  20. @Bean
  21. public Queue queueN1() {
  22. return new Queue(RabbitConstants.HEADER_QUEUE_ONE);
  23. }
  24. @Bean
  25. public Queue queueN2() {
  26. return new Queue(RabbitConstants.HEADER_QUEUE_TWO);
  27. }
  28. @Bean
  29. public HeadersExchange headersExchange(){
  30. return new HeadersExchange(RabbitConstants.HEADER_EXCHANGE);
  31. }
  32. /**
  33. * header的队列匹配可以用mathces(匹配)和exisits(包含)
  34. * return BindingBuilder.bind(queue()).to(exchange()).where("busTyp").matches("1");
  35. * @return
  36. */
  37. @Bean
  38. public Binding queueN1Binding(){
  39. Map<String,Object> map = new HashMap<>();
  40. map.put("queueName","queueN1");
  41. map.put("bindType","whereAll");
  42. //全匹配
  43. return BindingBuilder.bind(queueN1()).to(headersExchange()).whereAll(map).match();
  44. }
  45. @Bean
  46. public Binding queueN2Binding(){
  47. Map<String,Object> map = new HashMap<>();
  48. map.put("queueName","queueN2");
  49. map.put("bindType","whereAny");
  50. //有一个就行
  51. return BindingBuilder.bind(queueN2()).to(headersExchange()).whereAny(map).match();
  52. }
  53. }

2.生产者

  1. @GetMapping("header-exchange")
  2. public void sendHeader() throws UnsupportedEncodingException {
  3. String messageStr = "hello rabbit哈哈哈-header";
  4. MessageProperties messageProperties = new MessageProperties();
  5. messageProperties.setHeader("queueName","queueN1");
  6. messageProperties.setHeader("bindType","whereAll");
  7. Message message = new Message(messageStr.getBytes(), messageProperties);
  8. rabbitTemplate.send(RabbitConstants.HEADER_EXCHANGE,null,message);
  9. }
  10. @GetMapping("header-exchange1")
  11. public void sendHeader1() throws UnsupportedEncodingException {
  12. String messageStr = "hello rabbit哈哈哈-header1";
  13. MessageProperties messageProperties = new MessageProperties();
  14. messageProperties.setHeader("queueName","queueN2");
  15. messageProperties.setHeader("bindType","whereAll");
  16. Message message = new Message(messageStr.getBytes(), messageProperties);
  17. rabbitTemplate.send(RabbitConstants.HEADER_EXCHANGE,null,message);
  18. }

3.消费者

  1. /**
  2. * headers模式
  3. * 监听消息队列,记得注解里面用常量,否则报错
  4. */
  5. @RabbitListener(queues=RabbitConstants.HEADER_QUEUE_ONE)
  6. public void headerOne(byte[] bytes) {
  7. log.info("header message : " +new String(bytes));
  8. }
  9. @RabbitListener(queues=RabbitConstants.HEADER_QUEUE_TWO)
  10. public void headerTwo(byte[] bytes) {
  11. log.info("header message : " +new String(bytes));
  12. }

代码就暂时不上传了,等把rabbit写完了再一起上传

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/431759
推荐阅读
相关标签
  

闽ICP备14008679号