赞
踩
上一章只是简单的介绍说明了一下rabbit mq的作用,以及spring boot 集成及简单的使用。里面有很多概念还没有来得及说明。本章节我们就来先了解一下rabbit的常用概念,并且对几种路由模式的使用方式
画图一直是硬伤,所以还是贴网图了,下面的图就是rabbit mq的工作模型。我们看完后面每一个含义,就大概了解rabbit的工作方式了
往mq里面发送消息的程序,上一章节里面的ProducerController就是一个producer
消息的消费者程序,上一章的ConsumerDemo,这两个概念还是比较好理解
生产者与消费者想要发送或者接收mq的消息,就首先需要连接到mq,connection就是这个连接,它的本质是一个tcp的长连接
信道是在Connection上建立的虚拟连接,从图中可以看出,一个连接中有多个信道。rabbit mq中的大部分操作都由信道完成。信道也是一个多线程的操作,主要是为了提高效率,毕竟一个连接所能支撑的操作肯定有限
前面也提到过,就是一个先进先出的数据结构,生产者发送到队列中,消费者消费
这个概念就相对比较重要了,可以看到生产者发送消息后的第一站就是交换机,负责根据不同的分发规则将消息分发到不同的队列上去(需要绑定(binding)队列)。消费者订阅了相关队列就可以接收到消息。那么它的分发规则就是我们接下来要重点讲的路由模式,一共四种:direct、fanout、topic、headers
了解了这几个概念再回头看图,基本就能明白 rabbit mq的工作方式:
1.生产者生产消息,通过Connection发送到mq中(也可以称为一个Broker)
2.交换机根据规则和绑定的队列进行分发到对应 queue(队列)中
3.消费者通过订阅对应queue,从连接的channel(信道)中接收到消息
4.ack确认(这个没有在图中体现,主要是做消息确认的,消费端告诉mq自己已经接收到了)
当你在网上搜索rabbit 模式得时候,你会看到有的说四种,五种,六种得都有。我好难。。。。其实也不能怪他们,只是没有说清楚维度,下面就是为什么看到说四五六种都有的原因,希望大家看完能明白rabbit mq的模式划分
1.simple 简单模式
2.work工作模式(资源的竞争)
3.publish/subscribe发布订阅(共享资源)
4.routing路由模式
5.topic 主题模式(路由模式的一种)
6.RPC (它并不是mq,所以很多人就排除掉它,认为只有五种,也没有问题)
那么四种模式又是哪儿来的呢?
说明一下,交换机还有一个默认交换机,default,它本质就是一个空名称的直连交换机,就没有单独去说明了,了解一下就行了,因为也有说交换机模式有五种的(心累)
路由到Routingkey与BindingKey完全匹配的队列中。一个交换器可以与多个队列绑定,同时一个交换器与一个队列绑定的时候可以使用多个BindingKey来多次绑定。
如果一个交换器只绑定一个队列,那么可以将Routingkey和BindingKey看成一个东西。
如果一个交换器绑定多个队列,则会把消息路由到Routingkey与BindingKey完全匹配的队列中。
1.定义队列
- package com.rabbit.config;
-
- import com.rabbit.constants.RabbitConstants;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @ClassName DirectExchangeConfig
- * @Description Direct 直连模式 声明队列,所有消息都通过队列转发
- * @Author andy
- * @Date 2022/7/16 16:35
- * @Version 1.0
- */
- @Configuration
- public class DirectExchangeConfig {
-
- /**
- * 声明队列 info
- * @return
- */
- @Bean
- Queue testDirectQueue() {
- return new Queue(RabbitConstants.DIRECT_QUEUE);
- }
- /**
- * 声明队列 debug
- * @return
- */
- @Bean
- Queue testDirectQueueDebug() {
- return new Queue(RabbitConstants.DIRECT_QUEUE_DEBUG);
- }
-
- /**
- * 声明交换器
- */
- @Bean
- DirectExchange getDirectExchange(){
- return new DirectExchange(RabbitConstants.DIRECT_EXCHANGE);
- }
-
- /**
- * 绑定队列到交换器,由交换器进行发送,with中routing key 路由键
- */
- @Bean
- Binding bindingDirect(){
- return BindingBuilder.bind(testDirectQueue()).to(getDirectExchange()).with("info");
- }
- @Bean
- Binding bindingDirectDebug(){
- return BindingBuilder.bind(testDirectQueueDebug()).to(getDirectExchange()).with("debug");
- }
-
- }
2.发送消息
- @GetMapping("direct-exchange")
- public void sendDirect(){
- String msg = "hello rabbit哈哈哈-direct";
- rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_EXCHANGE,"info", msg);
- rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_EXCHANGE,"debug", msg);
- }
- /**
- * 发送到指定队列中,因为默认路由其实也是direct,所以了解下
- @GetMapping("direct-exchange")
- public void sendDirect(){
- String msg = "hello rabbit哈哈哈-direct";
- rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_QUEUE, msg);
- }
- */
3.消费端监听
- /**
- * Direct直连模式
- * 监听消息队列,记得注解里面用常量,否则报错
- */
- @RabbitListener(queues = RabbitConstants.DIRECT_QUEUE)
- public void listenerOne(String msg, Channel channel, Message message){
- log.info("接收到直连模式的消息:"+msg);
- }
-
- @RabbitListener(queues = RabbitConstants.DIRECT_QUEUE_DEBUG)
- public void listenerTwo(String msg, Channel channel, Message message){
- log.info("接收到直连模式的消息:"+msg);
- }
把发送到该交换器的消息发送到所有与该交换器绑定的队列中。不需要指定Routingkey和BindingKey。比如两个队列绑定了同一个交换器,那么2个队列都会同时收到消息。
这个上一章已经写过了,就不再写一遍了,可以自行翻阅上一章的内容
topic(通配符)
topic与direct类型的交换器类似,也是将消息路由到Routingkey与BindingKey匹配的队列中,但它不是完全匹配,而是模糊匹配。
1.topic配置设置
- package com.rabbit.config;
-
- import com.rabbit.constants.RabbitConstants;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @ClassName TopicExchangeConfig
- * @Description Topic模式 类似direct
- * @Author andy
- * @Date 2022/7/16 16:35
- * @Version 1.0
- */
- @Configuration
- public class TopicExchangeConfig {
-
- /**
- * 声明队列
- * @return
- */
- @Bean
- Queue testTopicQueue() {
- return new Queue(RabbitConstants.TOPIC_QUEUE_INFO);
- }
- /**
- * 声明队列
- * @return
- */
- @Bean
- Queue testTopicQueueDebug() {
- return new Queue(RabbitConstants.TOPIC_QUEUE_DEBUG);
- }
-
- /**
- * 声明交换器
- */
- @Bean
- TopicExchange getTopicExchange(){
- return new TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
- }
-
- /**
- * 绑定队列到交换器,由交换器进行发送,with中routing key 路由键
- */
- @Bean
- Binding bindingTopic(){
- //需要指定roytingKey,发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。
- return BindingBuilder.bind(testTopicQueue()).to(getTopicExchange()).with("*.info");
- }
- @Bean
- Binding bindingTopicDebug(){
- return BindingBuilder.bind(testTopicQueueDebug()).to(getTopicExchange()).with("topic.#");
- }
-
- }
2.消息生产
- @GetMapping("topic-exchange")
- public void sendTopic(){
- //需要指定roytingKey,发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。
- String msg = "hello rabbit哈哈哈-topic";
- rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"*.info", "info--"+msg);
- rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"*.debug", "debug--"+msg);
- }
3.消费者监听
- /**
- * Topic模式
- * 监听消息队列,记得注解里面用常量,否则报错
- */
- @RabbitListener(queues = RabbitConstants.TOPIC_QUEUE_INFO)
- public void listenerTopicOne(String msg, Channel channel, Message message){
- log.info("接收到topic模式的消息:"+msg);
- }
-
- @RabbitListener(queues = RabbitConstants.TOPIC_QUEUE_DEBUG)
- public void listenerTopicTwo(String msg, Channel channel, Message message){
- log.info("接收到topic模式的消息:"+msg);
- }
headers(头)
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
1.configer
- package com.rabbit.config;
-
- import com.rabbit.constants.RabbitConstants;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.HeadersExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @ClassName HeadersExchangeConfig
- * @Description TODO
- * @Author andy
- * @Date 2022/7/20 17:59
- * @Version 1.0
- */
- @Configuration
- public class HeadersExchangeConfig {
-
- @Bean
- public Queue queueN1() {
-
- return new Queue(RabbitConstants.HEADER_QUEUE_ONE);
- }
-
- @Bean
- public Queue queueN2() {
-
- return new Queue(RabbitConstants.HEADER_QUEUE_TWO);
-
- }
-
- @Bean
- public HeadersExchange headersExchange(){
-
- return new HeadersExchange(RabbitConstants.HEADER_EXCHANGE);
- }
-
- /**
- * header的队列匹配可以用mathces(匹配)和exisits(包含)
- * return BindingBuilder.bind(queue()).to(exchange()).where("busTyp").matches("1");
- * @return
- */
- @Bean
- public Binding queueN1Binding(){
-
- Map<String,Object> map = new HashMap<>();
- map.put("queueName","queueN1");
- map.put("bindType","whereAll");
- //全匹配
- return BindingBuilder.bind(queueN1()).to(headersExchange()).whereAll(map).match();
- }
-
- @Bean
- public Binding queueN2Binding(){
-
- Map<String,Object> map = new HashMap<>();
- map.put("queueName","queueN2");
- map.put("bindType","whereAny");
- //有一个就行
- return BindingBuilder.bind(queueN2()).to(headersExchange()).whereAny(map).match();
- }
-
- }
2.生产者
- @GetMapping("header-exchange")
- public void sendHeader() throws UnsupportedEncodingException {
- String messageStr = "hello rabbit哈哈哈-header";
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setHeader("queueName","queueN1");
- messageProperties.setHeader("bindType","whereAll");
- Message message = new Message(messageStr.getBytes(), messageProperties);
- rabbitTemplate.send(RabbitConstants.HEADER_EXCHANGE,null,message);
- }
- @GetMapping("header-exchange1")
- public void sendHeader1() throws UnsupportedEncodingException {
- String messageStr = "hello rabbit哈哈哈-header1";
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setHeader("queueName","queueN2");
- messageProperties.setHeader("bindType","whereAll");
- Message message = new Message(messageStr.getBytes(), messageProperties);
- rabbitTemplate.send(RabbitConstants.HEADER_EXCHANGE,null,message);
- }
3.消费者
- /**
- * headers模式
- * 监听消息队列,记得注解里面用常量,否则报错
- */
- @RabbitListener(queues=RabbitConstants.HEADER_QUEUE_ONE)
- public void headerOne(byte[] bytes) {
- log.info("header message : " +new String(bytes));
- }
-
- @RabbitListener(queues=RabbitConstants.HEADER_QUEUE_TWO)
- public void headerTwo(byte[] bytes) {
- log.info("header message : " +new String(bytes));
- }
代码就暂时不上传了,等把rabbit写完了再一起上传
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。