赞
踩
首先,我们先了解一下RabbitMQ的含义。RabbitMQ是一个开源的消息中间件,它实现了高级消息队列协议(AMQP)用于进行异步通信。
这里简单的介绍一下异步通信:异步通信是一种通信模式,其中发送方和接收方的操作不是同步进行的。在异步通信中,发送方向接收方发送消息,然后继续执行其他操作,而不必等待接收方的响应。接收方在接收到消息后,可以处理消息,并在处理完成后向发送方发送响应。(举个例子:就好比你在微信上向一个好友发了一条消息,你的好友不一定要立即回复你,而是他有时间了再回复你,你也不是一直守着等他回复你。)
RabbitMQ基于消息队列的模式,通过将消息发送到队列中,然后由消费者从队列中取出并处理这些消息。主要的概念如下:
Producer(生产者):负责发送消息到RabbitMQ的队列中。
Consumer(消费者):从RabbitMQ的队列中接收并处理消息。
Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。
Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。
Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。
Routing Key(路由键):用于将消息从交换机路由到特定的队列。
你要先对着这些概念有个印象,接下来让我们用编码的方式来进行实现。
本次我们使用JDK17、spring boot3.0、RabbitMQ3.9.11
本次采用父子工程的形式,一个父项目下面有两个子模块。父项目是一个空的maven项目,子模块为:生产者模块(Producer)和消费者模块(Consumer)两个子模块都是spring boot项目。在两个子模块中引入RabbitMQ的依赖AMQP,
AMQP是一种通信协议(类似于HTTP请求协议)RabbitMQ遵从AMQP,而spring boot整合过AMQP,所以只要引入AMQP的依赖就可以实现RabbitMQ,两个子模块的pom.xml文件如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
接下来,我们使用docker来创建RabbitMQ容器(默认已经安装过docker了):
docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
运行这条命令就可以创建一个docker容器,解释一下我们会用到的两个端口,
5672:是RabbitMQ默认的AMQP协议端口,用于客户端应用程序与RabbitMQ服务之间的通信。(服务端)
15672:是RabbitMQ的Web管理界面的默认端口,用于通过浏览器访问RabbitMQ的管理界面(客户端)
使用http://虚拟机地址:15672 就可以访问到。第一次登录,用户名和密码,你可以任意填,但是填入之后就会保存,下一次登录,就需要用到这次填的用户名和密码。
这就是登录成功的页面,接下来,我们根据这个用户来进行一系列的操作。
在我们两个子项目的配置文件中加入关于RabbitMQ的配置信息:
spring: rabbitmq: port: 5672 host: 192.168.231.110 username: zhangqiao password: 123456 virtual-host: /
host:你的虚拟机地址
post:RabbitMQ的服务端口号,一般为5672
username:用户名
password:密码
virtual-host:虚拟主机号,类似于分组,用来隔离不同的使用场景,每个虚拟主机都是一个独立的消息代理
接下来我先解释一下前面提到的6个概念,
生产者:见名知意,就是生产消息的一方。对应的模块为Producer,作用是发送消息
消费者:接收消息的一方,对应的模块为Consumer,接收Producer发送的消息。
注意:生产者和消费者的身份是相对的,A要对B发消息,A是生产者,B是消费者。不能单指某模块为生产者或消费者,且这两者的身份因为业务的不同,随时可以发生调换。
Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。但是我们一般不这样做,这样太直接了,而且也不能在进行其他的操作。这个时候我们会引入一个中间量:Exchange(交换机)。
Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。
Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。
Routing Key(路由键):用于将消息从交换机路由到特定的队列。(并不是交换机与队列绑定了,就一定会将消息发送给队列。绑定只是一个前提,还要满足路由键(某些指定的条件)才行)
现在,我想你已经了解了它们之间的关系,生产者将消息发送给交换机,交换机与指定的对列进行绑定,并通过路由键判定交换机是否发送消息给队列。消费者监听队列,就能获得生产者发送的消息。
生产者与交换机、交换机与队列、队列与消费者;它们之间都是多对多的关系。
一个生产者可以绑定多个交换机,一个交换机也可以被多个生产者绑定。交换机与队列、队列与消费者之间亦是如此。
常用的有三种类型的交换机:
Direct、Topic、Fanout交换机,下面我们会进行详细介绍;
接下来在idea中编写代码:
首先,我们编写生产者(Producer)模块代码:按照我们之前的逻辑,生产者是要往交换机发送消息的,那么我们的首先拥有交换机才行。在RabbitMQ中创建交换机一般有两种常用方式,接下来我们挨个实现。
1、使用bean的方式新建交换机和队列,并指定绑定关系
@Configuration
public class RabbitMQConfig {@Bean
public DirectExchange directExchange() {
return new DirectExchange("zhang.direct01");
}@Bean
public Queue defaultQueue() {
return new Queue("zhang.queue01");
}@Bean
public Binding defaultBinding(Queue defaultQueue, DirectExchange directExchange) {
return BindingBuilder.bind(defaultQueue).to(directExchange).with("red");
}
}
创建一个Direct交换机zhang.direct01,创建一个消息队列zhang.queue01,将他们进行绑定,指定他们之间的路由键为red。
2、使用注解建立交换机和队列,并指定绑定关系
@RabbitListener(bindings = @QueueBinding( value = @Queue("zhang.queue01"), exchange = @Exchange(value = "zhang.direct01",type = "direct"), key = "red" ))
这个注解必须要依托在方法上,不能独自使用。
@Queue("zhang.queue01"):新建一个zhang.queue01队列。(如果已存在,不做任何处理)
@Exchange(value = "zhang.direct01",type = "direct"):新建一个zhang.direct01交换机,类型为direct(如果已存在,不做任何处理)
key:路由键red。
使用注解创建的交换机和队列是绑定的。(推荐使用第二种注解方式,简单、直白)
@Autowired private RabbitTemplate rabbitTemplate; @Test void DirectSendMessage() { rabbitTemplate.convertAndSend("zhang.direct01","","张三,你好啊"); rabbitTemplate.convertAndSend("zhang.direct01","red","张三,你好啊。路由键为red"); }
引入rabbitTemplate,使用convertAndSend方法就能发送消息到交换机,zhang.direct01是交换机的名称,第二个参数是路由键,(也可以不设置,不设置时表示每个与交换机绑定的队列都能接收到消息)、第三个参数就是要发送的消息。
现在,我们已经创建了交换机zhang.direct01,创建了消息队列zhang.queue01,并将它们进行了绑定,指定他们的路由键为red。并且向zhang.direct01发送了两条消息,一条没有指定路由键,一条指定路由键为red。(没有指定路由键时,那么所有与之绑定的队列都能接受到消息)
现在,我们就可以在消费者模块(Consumer)编写代码,来监听队列了。
// 监听普通的消息 @RabbitListener(queues = {"zhang.queue01"}) public void listenerQueue(String msg){ System.out.println("消费者监听了队列zhang.queue01,接收到消息:"+msg); }
我们在一个bean中编写监听代码,使用@RabbitListener注解,并指定监听的消息队列为zhang.queue01,接收一个String类型的参数msg,那么队列中缓存的消息就会映射到msg中,在控制台进行输出,(这个参数类型要与生产者发送到交换机的消息类型一致,不然会报错)将这个模块运行起来就能一直监听zhang.queue01这个消息队列了。
如图所示,我们监听到了,zhang.queue01,这个对类中的所有消息。
现在,我们已经能发送和接收消息了,接下来,我们重点关注一下交换机和 路由键。
1、Direct Exchange(直连交换机): Direct Exchange 是最简单的交换机类型。它根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。如果消息的路由键与某个队列绑定的路由键完全相同,那么消息将被发送到该队列。
2、Fanout Exchange(扇形交换机): Fanout Exchange 将消息广播到与之绑定的所有队列。它忽略消息的路由键,只需将消息发送到与之绑定的所有队列即可。
3、Topic Exchange(主题交换机): Topic Exchange 根据消息的路由键模式进行匹配和分发。它使用带有通配符的路由键进行匹配,支持两种通配符符号:*(匹配一个单词)和#(匹配零个或多个单词)。例如,路由键 "stock.usd.nyse" 可以匹配绑定了 "stock..*" 或者 "stock.#" 的队列。
这三种路由键是我们最常用到的,我们之前建立的就是Direct类型的交换机,接下来我们建立第二种交换机Fanout,并绑定队列
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "zhang.queue02",durable = "true"), exchange = @Exchange(value = "zhang.fanout",type = ExchangeTypes.FANOUT) )) @Test void FanoutSendMessage() { rabbitTemplate.convertAndSend("zhang.fanout","","张三,你好啊,广播模式"); }
Fanout忽略路由键,所以我们创建时,可以不写路由键。
在消费者模块监听zhang.queue02队列
//监听广播消息 @RabbitListener(queues = {"zhang.queue02"}) public void listenerBroadcast(String msg){ System.out.println("消费者监听了队列zhang.queue02,接收到消息:"+msg); }
重新启动模块,就能监听到zhang.queue02的消息:
建立Topic 交换机,它与Direct交换机不同的是,Topic的路由键可以使用通配符,
*
(星号):匹配一个单词。例如,绑定键为 "stock.*"
的队列将匹配到路由键为 "stock.apple"
、"stock.microsoft"
等。
#
(井号):匹配零个或多个单词。例如,绑定键为 "stock.#"
的队列将匹配到路由键为 "stock"
、"stock.apple"
、"stock.apple.us"
等。
接下来,我们来演示一下:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "zhang.queue03"), exchange = @Exchange(value = "zhang.topic01",type = ExchangeTypes.TOPIC), key = {"china.*","#.queue","mm.nn"} )) @Test void TopicSendMessage() { rabbitTemplate.convertAndSend("zhang.topic01","mm.nn","张三,你好啊,mm.nn"); rabbitTemplate.convertAndSend("zhang.topic01","mm.queue","张三,你好啊,mm.queue"); rabbitTemplate.convertAndSend("zhang.topic01","china.nn","张三,你好啊,china.nn"); rabbitTemplate.convertAndSend("zhang.topic01","mm.china","张三,你好啊,mm.china"); rabbitTemplate.convertAndSend("zhang.topic01","queue.nn","张三,你好啊,queue.nn"); }
我们创建了zhang.queue03队列,绑定了zhang.topic01,并制定了三个路由键。使用了通配符的形式。然后发送了五条消息,第四、五条消息不符合规则。因此不能接受到。
到这里,我们整合RabbitMQ实现快速入门就完成了。然后补充一点知识:
如果发送的消息为对象类型时,由于Rabbit默认使用了JDK的序列化方式(底层会判断你传入的是不是message类型,如果不是会使用消息类型转化器转为消息类型),就算你收发双方都使用了同一种类型。那么,消费者在接收时也会报错,这是由于接收双发使用的消息类型转化器不一致。
生产者发送的消息类型为Student
消费者接收的消息类型也为Student
运行时报错,原因是:发送方与接收方的消息类型转换器不统一:
我们有两种解决方案,
<一>:自定义消息类型转换器,使接收方与发送方的消息类型转换器一致
导入json的依赖,MessageConverter 的类型为:
@Configuration
public class MessageConvent {
@Bean
public MessageConverter jsonMessageConvert() {
return new Jackson2JsonMessageConverter();
}
}
在收发双方同时加入自定义消息类型转换器就可以了。
<二>:在发送消息时,将对象转化为字符串,接收方依旧使用String接收。
第一种方式更加正规一些,但是第二种方式简单一些。根据自己的需求选择适用于自己的方式。
一般来说:在生产者中,进行交换机的创建、消息队列的创建、绑定关系,与发送消息(推荐使用@RabbitListener注解)
在消费者中,只进行监听队列的绑定。。。。。
我在使用@RabbitListener注解时遇到了一些问题,在这里我记录一下,来警示自己。
我在使用@RabbitListener创建交换机与队列时,有时候会报错误,显示找不到队列。我感到很惊奇,明明@RabbitListener注解就是用来创建交换机和队列的,怎么会显示找不到队列呢?我之后请教了学校的老师后才发现,原来使用@RabbitListener创建交换机与队列时,有一些前置的条件要满足。
@RabbitListener 注解会自动声明队列、交换机和绑定。但是,它只会在第一次启动应用程序时执行,以确保队列、交换机和绑定的声明只发生一次。如果在后续的运行中,队列已经存在或者配置发生了变化,@RabbitListener
注解将不会再次声明队列。
因此,如果你在代码中使用 @RabbitListener注解监听的队列名称,但是该队列在 RabbitMQ 中并不存在,那么在第一次运行时会自动创建该队列。但如果在后续的运行中,你手动删除了该队列,或者更改了队列的配置,再次启动应用程序时@RabbitListener注解不会再次创建该队列。
解决该问题的方法是,在应用程序启动之前,在控制台(15672)手动创建队列或者在配置文件中配置队列的声明。这样可以确保队列的存在,并且与 @RabbitListener 注解中指定的队列名称一致。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。