当前位置:   article > 正文

RabbitMQ的四种交换机模式_mq的四种模式

mq的四种模式

一、RabbitMQ的简单介绍

RabbitMQ作为一个消息队列,它负责提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。
消息(Message)由Client(客户端)发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。
消息处理的过程

二、RabbitMQ的四种交换机

  • 直连交换机:Direct exchange
  • 扇形交换机:Fanout exchange
  • 主体交换机:Topic exchange
  • 首部交换机:Headers exchange
1、直连交换机

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。
这样当一个交换机绑定多个队列时,就会被送到对应的队列去处理。
直连交换机
如图所示:当指定routing_key为key1时,消息队列1和2都会收到消息,下面用伪代码演示一下(只做演示,相关依赖省略):

MQ配置类

@Configuration
public class MQConfig {
	//创建三个队列1,2,3
	//Queue的第一个参数为队列名称,第二个参数为是否持久存在
	 @Bean
    public Queue directQueue1() {
        return new Queue("queue1", true);
    }

    @Bean
    public Queue directQueue2() {
        return new Queue("queue2", true);
    }

    @Bean
    public Queue directQueue3() {
        return new Queue("queue3", true);
    }
	//创建直连交换机,参数为交换机的名称
	@Bean
    public DirectExchange directExchange() {
        return new DirectExchange("DIRECT_EXCHANGE");
    }
	//将三个队列都与该直连交换机绑定起来,并赋予上面说的binding_key(也可以说是routing_key)
	@Bean
    public Binding bindingDirectExchange1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("key.1");
    }

    @Bean
    public Binding bindingDirectExchange2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("key.1");
    }

    @Bean
    public Binding bindingDirectExchange3() {
        return BindingBuilder.bind(directQueue3()).to(directExchange()).with("key.2");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

MQSnder发送者类

@Service
public class MQSender {
	//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
	@Autowired
    AmqpTemplate amqpTemplate;
    
    public void send(String message) {
    	//第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key,第三个参数为发送的具体消息   
    	amqpTemplate.convertAndSend("DIRECT_EXCHANGE", "key.1", message);   				 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

MQReceive消费者类

@Service
public class MQReceiver {
	private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);
	//此注解表示监听某个队列,参数为队列名
    @RabbitListener(queues = "queue1")
    public void receive1(String message) {
        logger.info("receive : dir1 message {}", message);
    }

    @RabbitListener(queues = "queue2")
    public void receive2(String message) {
        logger.info("receive : dir2 message {}", message);
    }

    @RabbitListener(queues = "queue3")
    public void receive3(String message) {
        logger.info("receive : dir3 message {}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

调用发送者的send方法后,发现结果是队列1和2收到了消息,符合预期在这里插入图片描述
适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以派更多的资源去处理高优先级的队列。

2、扇形交换机

扇形交换机是最基本的交换机类型,它能做的事非常简单——广播消息,扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要"思考",所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
在这里插入图片描述
下面用伪代码演示一下。
MQ配置类

@Configuration
public class MQConfig {
	//创建三个队列1,2,3
	//Queue的第一个参数为队列名称,第二个参数为是否持久存在
	 @Bean
    public Queue fanoutQueue1() {
        return new Queue("queue1", true);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("queue2", true);
    }

    @Bean
    public Queue fanoutQueue3() {
        return new Queue("queue3", true);
    }
	//创建扇形交换机,参数为交换机的名称
	@Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange ("FANOUT_EXCHANGE");
    }
	//将三个队列都与该交换机绑定起来,无需binding_key
	@Bean
    public Binding bindingFanoutExchange1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutExchange2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutExchange3() {
        return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

MQSnder发送者类

@Service
public class MQSender {
	//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
	@Autowired
    AmqpTemplate amqpTemplate;
    
    public void send(String message) {
    	//第一个参数指将消息发送到该名称的交换机,第二个参数为对应的routing_key(此时设置为空字符串即可),第三个参数为发送的具体消息   
    	amqpTemplate.convertAndSend("FANOUT_EXCHANGE", "", message);   				 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

MQReceive消费者类

@Service
public class MQReceiver {
	private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);
	//此注解表示监听某个队列,参数为队列名
    @RabbitListener(queues = "queue1")
    public void receive1(String message) {
        logger.info("receive : fanout message {}", message);
    }

    @RabbitListener(queues = "queue2")
    public void receive2(String message) {
        logger.info("receive : fanout message {}", message);
    }

    @RabbitListener(queues = "queue3")
    public void receive3(String message) {
        logger.info("receive : fanout message {}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

调用MQSendsend()方法,三个队列都能接收到消息。
适用场景:需要给所有绑定该交换机的队列直接发送消息时使用。

3、主题交换机

直连交换机的routing_key方法非常简单,如果希望将一条消息发送给多个队列,那么这个交换机需要绑定非常多的routing_key,这样的话消息的管理就会非常的困难。
所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带制定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的队列上。
主题交换机的routing_key需要有一定的规则,交换机和队列绑定时候设置的binding_key需要采用*.#.*…的格式,每个部分用.分开,其中:

  • *表示一个单词
  • #表示任意数量(零个或多个)单词。
    假设有一条消息的routing_keycom.lrving.www,那么带有这样binding_key的几个队列都有收到消息:
  1. com…
  2. …www
  3. com.#

下面是网上的一张图,清楚描述了主题交换机的消息发送规则
在这里插入图片描述
当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息。下面用伪代码演示一下
MQ配置类

@Configuration
public class MQConfig {
	@Bean
    public Queue topicQueue1() {
        return new Queue("TOPIC_QUEUE1", true);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue("TOPIC_QUEUE2", true);
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("TOPIC_EXCHANGE");
    }
  	//将topicQueue1与topicExchange交换机绑定
    @Bean
    public Binding bindQueue1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
    }
    //将topicQueue2与topicExchange交换机绑定
    @Bean
    public Binding bindQueue2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

MQSnder发送者类

@Service
public class MQSender {
	//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
	@Autowired
    AmqpTemplate amqpTemplate;
   public void sendTopic(String message) {
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key1", message);
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "topic.key2", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

MQReceive消费者类

@Service
public class MQReceiver {
	private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);
	@RabbitListener(queues = "TOPIC_QUEUE1")
    public void receiveQueue1(String message) {
        logger.info("receive : queue1 {}", message);
    }

    @RabbitListener(queues ="TOPIC_QUEUE2")
    public void receiveQueue2(String message) {
        logger.info("receive : queue2 {}", message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

调用MQSendersend()方法,发现队列1收到了一条消息,而队列2收到了两条消息,符合参数#的描述。

4、首部交换机

首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP请求中的请求头。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash结构中要求携带一个键"x-match",这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅仅匹配一个键(any)就可以了。相比较直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
下面用伪代码演示一下:
MQ配置类

@Configuration
public class MQConfig {
	 @Bean
    public Queue headersQueue() {
        return new Queue("HEADERS_QUEUE");
    }
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("HEADERS_EXCHANGE");
    }
    //将headersQueue与HeadersExchange交换机绑定
    @Bean
    public Binding bingHeadersQueue() {
    	//map为绑定的规则
        Map<String, Object> map = new HashMap<>();
        map.put("headers1", "value1");
        map.put("headers2", "value2");
        //whereAll表示需要满足所有条件
        return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

MQSnder发送者类

@Service
public class MQSender {
	//注入AmqpTemplate接口,该接口定义了发送和接收消息的基本操作
	@Autowired
    AmqpTemplate amqpTemplate;
   public void sendHeaders(String message) {
   		//配置消息规则
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("headers1", "value1");
        messageProperties.setHeader("headers2", "value2");
        //要发送的消息,第一个参数为具体的消息字节数组,第二个参数为消息规则
        Message msg = new Message(result.getBytes(), messageProperties);
        amqpTemplate.convertAndSend("HEADERS_EXCHANGE", "", msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

MQReceive消费者类

@Service
public class MQReceiver {
	private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);
   @RabbitListener(queues = "HEADERS_QUEUE")
    public void receiveHeadersQueue(byte[] message) {
        logger.info("receive : HeadersQueue {}", new String(message));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

调用MQSendersend()方法,发现队列收到了消息,符合预期。

参考资料

参考资料

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/763289
推荐阅读
相关标签
  

闽ICP备14008679号