赞
踩
目录
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体 验要好。
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于 消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可 以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题, A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此 消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不 用做这些操作。A 服务还能及时的得到异步处理成功的消息。
产生数据发送消息的程序是生产者
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
这里使用docker下载安装带有管理平台的rabbitmq
docker pull rabbitmq:3.7.8-management
完成后启动rabbitmq
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.7.8-management
其中5672是rabbitmq的端口,15672是管理平台的端口
在浏览器访问,输入虚拟机的ip和管理平台端口号
启动完成
启动rabbitmq后,根据容器名进入控制台
docker exec -it 55c525da951f /bin/bash
1、查看用户列表
rabbitmqctl list_users
guest是docker管理页面的默认管理员,密码同用户名
添加一个新的用户
创建账号,用户名密码都是root
rabbitmqctl add_user root root
设置用户角色
rabbitmqctl set_user_tags root administrator
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
用户 root具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色
再次查看用户列表
rabbitmqctl list_users
添加完成,至此,rabbitmq的环境搭建完毕
当然,也可以用guest账户在图形化界面中进行用户管理等功能
创建一个maven工程,引入如下依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
代码如下
- @Test
- void testSendMessage2Queue() {
- //队列名
- String queueName = "qOne";
- //待发送消息
- User user = new User("han", 25);
- //发送消息方法
- rabbitTemplate.convertAndSend(queueName, user);
- }
- package com.itheima.consumer.listeners;
-
- import com.itheima.consumer.entity.User;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Slf4j
- @Component
- public class MqListener {
-
- @RabbitListener(queues = "qOne")
- public void listenSimpleQueue(User user){
- System.out.println("消费者收到了qOne的消息:【" + user +"】");
- }
-
- }

测试,启动消费者,再启动生产者,此时消费者会受到生产者的消息,包括堆积的消息
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程(消费者)将一起处理这些任务。
我们使用一个消费者通知两个相同队列接收者
- @RabbitListener(queues = "q1")
- public void listenWorkQueue1(String user) throws InterruptedException {
- System.out.println("消费者1 收到了 work.queue的消息:【" + user +"】");
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "q1")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg +"】");
- Thread.sleep(20);
- @Test
- void testWorkQueue() throws InterruptedException {
- String queueName = "q1";
- for (int i = 1; i <= 50; i++) {
- String msg = "hello, worker, message_" + i;
- rabbitTemplate.convertAndSend(queueName, msg);
- Thread.sleep(20);
- }
- }
让生产者从控制台接收我们输入的代码,以测试消费者是否以轮询模式接收
我们会发现,当两个消费者效率相同(sleep方法延时相同)时,两者会轮循地处理消息,下面我们将两者的效率变为不同,一个快一个慢,代码如下
- @RabbitListener(queues = "q1")
- public void listenWorkQueue1(String user) throws InterruptedException {
- System.out.println("消费者1 收到了 work.queue的消息:【" + user +"】");
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "q1")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg +"】");
- //降低效率
- Thread.sleep(500);
这时我们会发现两者均分了消息,导致处理效率没有最大化,理想状态下,应该为能力强的处理的多,能力弱的服务处理的少,需要在消费者的配置文件中配置处理策略,如下配置即可让两者按照能力强弱进行消息的处理,而非平衡处理
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。
注意:fanout模式下,不论队列信道的RoutingKey是否相同,绑定同一交换机的队列都会收到消息
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
- @RabbitListener(queues = "fanout.queue4")
- public void listenFanoutQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1 收到了 fanout的消息:【" + msg +"】");
- }
- @Test
- void testSendFanout() {
- String exchangeName = "han.fanout";
- String msg = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, null, msg);
- }
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key)- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
注意:这里绑定队列采用了注解的形式,无需手动在rabbitmq的图形化界面中创建交换机、队列并执行绑定等操作,使用@RabbitListener注解,配合bindings参数即可实现自动创建队列、交换机、RoutingKey等操作
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.q1", durable = "true"),
- exchange = @Exchange(name = "han.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listenDirectQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1 收到了 direct.q1的消息:【" + msg +"】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.q2", durable = "true"),
- exchange = @Exchange(name = "han.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "yellow"}
- ))
- public void listenDirectQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
- }

- @Test
- void testSendDirect() {
- String exchangeName = "han.direct";
- String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
- rabbitTemplate.convertAndSend(exchangeName, "red", msg);
- }
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。 只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如:item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
- package com.itheima.consumer.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @Author hanyang
- * @Date 2024/5/19 16:20
- * @version 1.0
- *
- */
- @Configuration
- public class TopicConfiguration {
-
- //初始化交换机
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("han.topic");
- }
-
- //初始化队列1
- @Bean
- public Queue topicQ1(){
- return new Queue("topic.q1");
- }
-
- //初始化队列2
- @Bean
- public Queue topicQ2(){
- return new Queue("topic.q2");
- }
-
- //绑定队列1,使用RoutingKey:china.tv.sport进行绑定
- @Bean
- public Binding bindQ1(TopicExchange topicExchange,Queue topicQ1){
- return BindingBuilder.bind(topicQ1).to(topicExchange).with("china.tv.sport");
- }
-
- @Bean
- public Binding bindQ2(TopicExchange topicExchange,Queue topicQ2){
- return BindingBuilder.bind(topicQ2).to(topicExchange).with("china.tv.#");
- }
-
- }

- @RabbitListener(queues = "topic.q1")
- public void listenTopicQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg +"】");
- }
-
- @RabbitListener(queues = "topic.q2")
- public void listenTopicQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg +"】");
- }
- @Test
- void testSendTopic() {
- String exchangeName = "han.topic";
- String msg = "今天天气挺不错,我的心情的挺好的";
- rabbitTemplate.convertAndSend(exchangeName, "china.tv.a", msg);
- }
-
- @Test
- void testSendTopicByAnno() {
- String exchangeName = "test.topic";
- String msg = "注解生成的绑定";
- rabbitTemplate.convertAndSend(exchangeName, "topic.test.a.b.c", msg);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。