赞
踩
学习目标
1.MQ相关概念
2.RabbitMQ的安装和配置
3.RabbitMQ入门程序
4.RabbitMQ的工作模式
5.Spring整合RabbitMQ
6.SpringBoot整合RabbitMQ
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息;MQ多用于分布式系统之间进行通讯。
分布式系统有两种通讯方式:
两种通讯方式过程如图所示:
小结
MQ的优势主要有三点:
分布式系统在使用直接远程调用时通讯过程如下:
在订单系统中直接远程调用库存系统、支付系统、物流系统会使得订单系统的耦合度非常高,如果在加一个其他系统的话,必须在订单系统中修改代码,这也使得订单系统的可维护性很差。
另一方面如果库存系统挂掉的话,订单系统也会随着挂掉,这使得订单系统的容错性极差。
如果加入MQ消息中间件可以大大改善上面的问题,如下图所示:
一方面使得订单系统和其他系统得到解耦,另一方面如果库存系统服务器挂了并不影响订单系统,只要等到库存系统服务器恢复后在执行MQ消息就好了;再一方面如果加一个其他系统只要在读取MQ消息就可以了;总体来说让系统解耦,提供系统可维护性,提供系统容错性。
接下来再从访问速度角度来看分布式系统直接远程调用过程,如下图所示:
总的下单时间=100MS+100MS+100MS+30MS=330MS,这时间也太长了吧,响应速度太慢。
如果加入MQ消息中间件,我们发现服务器的响应速度会得到大大提升,如下图所示:
总的下单时间=3ms+30ms=33ms,服务器的响应速度得到了大大提升,总的下单时间和其他系统得到消息时间以及其他系统运行时间无关,只要订单系统运行完毕并发送完消息就可以响应给客户了;大大增加了用户体验和系统吞吐量(单位时间内处理请求的数目)。
最后我们来看削峰填谷情况,如下图所示:
使用MQ就能解决这个问题,过程如下:
MQ起到了"削峰填谷"的作用,如下所示
MQ 限制了消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被"削"掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压消息,这就叫做"填谷"。
接下来我们通过下图展示MQ的劣势:
既然MQ有优势也有劣势,那么MQ的应用条件应满足什么?
生产者不需要从消费者获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完继续往后走,即所谓异步称为了可能。
允许短暂的不一致性。
确实是用了有效果。即解耦,提速,削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
目前市面上MQ产品较多,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka等,也有直接使用redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义 | 自定义协议,社区封装了http协议支持 |
客户端语言支持 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
功能特性 | 并发能力强,性能积极好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件限制。2006年,AMQP规范发布。协议内容如下图所示:
2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构图如下图:
RabbitMQ相关概念:
RabbitMQ 提供了6种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式,Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不介绍)。官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
RabbitMQ 官方地址:http://www.rabbitmq.com/,访问该网址可以查看到RabbitMQ安装教程。
【1】安装依赖环境:在线安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
【2】安装Erlang:上传
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
如果出现如下错误
说明gblic 版本太低。我们可以查看当前机器的gblic 版本
strings /lib64/libc.so.6 | grep GLIBC
当前最高版本2.12,需要2.15.所以需要升级glibc
使用yum更新安装依赖
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
下载rpm包
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
安装rpm包
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
安装完毕后再查看glibc版本,发现glibc版本已经到2.17了
strings /lib64/libc.so.6 | grep GLIBC
【3】安装RabbitMQ
# 安装
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
【4】开启管理界面及配置
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
【5】启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
设置配置文件
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
我们在Centos7虚拟机中使用Docker来安装。
【1】下载镜像
在线拉取
docker pull rabbitmq:3-management
使用命令加载镜像即可:
docker load -i mq.tar
【2】安装MQ,执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
首先我们来认识一下RabbitMQ管理页面的整体布局,如下图所示:
接下来点击Admin->Add a User 按如下图操作添加一个用户
接下来,我们添加一个虚拟机,虚拟机都是以/开头,我们创建/psjjmq虚拟机,如下所示:
最后,我们为虚拟机分配用户权限,将/psjjmq虚拟机分配给psjj用户,如下所示:
最终结果如下:
打开概述页面,我们发现没有rabbitmq.config配置文件,如下图所示:
接下来我们复制配置文件,跳转rabbitmq默认安装目录
cd /usr/share/doc/rabbitmq-server-3.6.5/
将目录下的rabbitmq.config.example文件复制到/etc/rabbitmq/rabbitmq.config中,如下所示:
cp ./rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
最后重启rabbitmq服务
service rabbitmq-server restart
问题解决了,最终结果如下:
入门案例需求:使用简单模式完成消息传递
实现思路:
创建maven父工程rabbitmq-day01,然后在该工程下创建rabbitmq-producer(消息生产者)和rabbitmq-consumer(消息消费者)两个子模块,如下图所示:
然后在两个子模块中添加rabbitmq客户端依赖,如下所示:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
首先我们在rabbitmq-producer模块编写生产者发送消息程序:
/** * 生产者发送消息 */ public class ProducerHelloWorld { public static void main(String[] args) throws Exception { //1.创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置连接参数 factory.setHost("192.168.15.142");//默认值localhost factory.setPort(5672);//默认值5672 factory.setVirtualHost("/psjjmq");//虚拟机 默认值/ factory.setUsername("psjj"); //用户名默认值guest factory.setPassword("123456"); //密码默认值guest //3.创建连接 Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列 /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ String queueName = "hello_world"; //如果没有一个名字叫hello_world的队列,则会创建,否则则不会创建 channel.queueDeclare(queueName,true,false,false,null); /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ String message = "hello world"; //6.发送消息 channel.basicPublish("",queueName,null,message.getBytes()); //7.释放资源 channel.close(); connection.close(); } }
接下来我们在rabbitmq-consumer模块编写消费者接收消息程序
/** * 消息消费者 */ public class ConsumerHelloWorld { public static void main(String[] args) throws Exception { //1.创建工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置连接参数 factory.setHost("192.168.15.142");//默认值localhost factory.setPort(5672);//默认值5672 factory.setVirtualHost("/psjjmq");//虚拟机 默认值/ factory.setUsername("psjj"); //用户名默认值guest factory.setPassword("123456"); //密码默认值guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world",true,false,false,null); // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ channel.basicConsume("hello_world",true,consumer); //不要关闭资源 } }
生产者流程:
消费者流程:
RabbitMQ五种常用的工作模式有:简单模式、工作队列模式、 发布与订阅模式(广播模式),路由模式、主题模式(通配符模式)。详细讲解如下。
工作模式的执行过程如下图所示:
代码实现过程同入门程序类似,不同的消费者端有两个;生产者代码如下:
package top.psjj.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消息生产者,工作队列模式 */ public class ProducerWorkQueue { public static void main(String[] args) throws Exception{ //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.15.142"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/psjjmq"); connectionFactory.setUsername("psjj"); connectionFactory.setPassword("123456"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.创建队列 队列名、是否持久化, 是否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数 channel.queueDeclare("work_queues",true,false,false,null); //5.发送消息 for (int i = 1; i <=10 ; i++) { String body = "~~~工作队列消息~~~"+i; //交换机,routingKey,配置信息,内容 channel.basicPublish("","work_queues",null,body.getBytes()); } //6.关闭资源 channel.close(); connection.close(); } }
消费者代码如下:
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 工作队列模式消息消费者 */ public class ConsumerWorkQueueA { public static <handleDelivery> void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.15.142"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/psjjmq"); connectionFactory.setUsername("psjj"); connectionFactory.setPassword("123456"); //2.获取新连接 Connection connection = connectionFactory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.声明队列:队列名,是否持久化,否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数 channel.queueDeclare("work_queues",true,false,false,null); //5.获得消息消费者 Consumer consumer = new DefaultConsumer(channel){ //唯一标识,获得交换机routingKey信息,配置信息,发送内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; //6.接收消息,队列名,是否启动确认,消息对象回调 channel.basicConsume("work_queues",true,consumer); } }
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 工作队列模式消息消费者 */ public class ConsumerWorkQueueB { public static <handleDelivery> void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.15.142"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/psjjmq"); connectionFactory.setUsername("psjj"); connectionFactory.setPassword("123456"); //2.获取新连接 Connection connection = connectionFactory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.声明队列:队列名,是否持久化,否独占已经关闭连接是否还在,没有consumer是否自动删除,内部参数 channel.queueDeclare("work_queues",true,false,false,null); //5.获得消息消费者 Consumer consumer = new DefaultConsumer(channel){ //唯一标识,获得交换机routingKey信息,配置信息,发送内容 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; //6.接收消息,队列名,是否启动确认,消息对象回调 channel.basicConsume("work_queues",true,consumer); } }
运行两个消息消费者,在运行消息生产者,我们发现,两个消费者消费消息各是一部分,证明多个消费者之间是抢占关系。
发布订阅模式也叫广播模式,在订阅模型中,多了一个Exchange角色,而且过程略有变化:
**注意:**Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者代码实现过程:
package top.psjj.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 发布订阅模式:消息生产者 */ public class ProducerPubSub { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setUsername("psjj"); factory.setPassword("123456"); factory.setVirtualHost("/psjjmq"); //2.获得连接 Connection connection = factory.newConnection(); //3.获得通道 Channel channel = connection.createChannel(); //4.声明交换机 String exchangeName = "my_fanout"; /** * 参数: * 1.交换机名称 * 2.交换机类型 * direct:定向 路由模式 * fanout 扇形 订阅广播模式 * topic : 通配符 主题模式 * 3.是否持久化 * 4.是否自动删除 * 5.internal:内部使用 一般false * 6.arguments:参数 */ channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6.创建队列 String queueName1 = "queue1"; String queueName2 = "queue2"; channel.queueDeclare(queueName1,true,false,false,null); channel.queueDeclare(queueName2,true,false,false,null); //7.绑定队列和交换机:队列名称,交换机名称,routingKye fanout类型设置为"" channel.queueBind(queueName1,exchangeName,""); channel.queueBind(queueName2,exchangeName,""); //8.发送消息 String body = "发布订阅消息"; channel.basicPublish(exchangeName,"",null,body.getBytes()); //9.释放资源 channel.close(); connection.close(); } }
消费者代码实现过程:
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 发布订阅模式:消息消费者 */ public class ConsumerPubSub1 { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.创建channel Channel channel = connection.createChannel(); //4.接收消息 String queueName = "queue1"; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(queueName,true,consumer); } }
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 发布订阅模式:消息消费者 */ public class ConsumerPubSub2 { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.创建channel Channel channel = connection.createChannel(); //4.接收消息 String queueName = "queue2"; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(queueName,true,consumer); } }
路由模式说明:
整体过程如下图所示:
图解:
案例需求:
生产者发送消息,routingKey为error消息由消费者1接收,routtingKey为info、error、warning消息由消费者2接收,具体代码如下:
生产者发送消息:
package top.psjj.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 路由模式消息生产者 */ public class ProducerRouting { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.创建交换机 String exchangeName = "my_direct"; //参数:交换机名称、交换机类型、是否持久化,是否自动删除,内部使用一般为false,参数 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //5.创建队列 String queue1 = "directQueue1"; String queue2 = "directQueue2"; channel.queueDeclare(queue1,true,false,false,null); channel.queueDeclare(queue2,true,false,false,null); //6.绑定队列和交换机:队列名,交换机名,routingKey channel.queueBind(queue1,exchangeName,"error"); channel.queueBind(queue2,exchangeName,"info"); channel.queueBind(queue2,exchangeName,"error"); channel.queueBind(queue2,exchangeName,"warning"); String body = "~~~路由模式消息内容~~"; //7.发送消息 //channel.basicPublish(exchangeName,"error",null,body.getBytes()); channel.basicPublish(exchangeName,"warning",null,body.getBytes()); //8.释放资源 channel.close(); connection.close(); } }
消费者接收消息:
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 路由模式消息消费者 */ public class ConsumerRouting1 { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.新建连接 Connection connection = factory.newConnection(); //3.获得通道 Channel channel = connection.createChannel(); //4.创建消费者对象 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("唯一标识:"+consumerTag); System.out.println("交换机"+envelope.getExchange()); System.out.println("routingkey:"+envelope.getRoutingKey()); System.out.println(new String(body)); } }; //5.接收消息 channel.basicConsume("directQueue1",true,consumer); } }
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 路由模式消息消费者 */ public class ConsumerRouting2 { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.新建连接 Connection connection = factory.newConnection(); //3.获得通道 Channel channel = connection.createChannel(); //4.创建消费者对象 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("唯一标识:"+consumerTag); System.out.println("交换机"+envelope.getExchange()); System.out.println("routingkey:"+envelope.getRoutingKey()); System.out.println(new String(body)); } }; //5.接收消息 channel.basicConsume("directQueue2",true,consumer); } }
运行上述代码,当发送消息时设置routingKey为error时,消费者1,消费者2都能接收到消息,当发送消息时设置routingKey为warning时,只有消费者2能接收到消息;证明路由模式成功。测试修改代码如下:
//channel.basicPublish(exchangeName,"error",null,body.getBytes());
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
主题模式又叫通配符模式;Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
通配符具体运行流程如下图所示:
*.orange.*
的消息*.*.rabbit
或者lazy.#的消息通过通配符模式完成如下需求:
生产者发送消息的routingKey为top.psjj.java 或者www.top.psjj.java
消费者1接收满足routingKey为*.psjj.*
条件的消息,消费者2接收满足routingKey为#.psjj.#
条件的消息;当发送消息时routingKey为top.psjj.java时,消费者1、消费者2都能接收到消息;当发送消息时routingKey为www.top.psjj.java
时,只有消费者2能接收到消息。
生产者代码:
package top.psjj.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 通配符模式 消息生产者 */ public class ProducerTopic { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.获得通道 Channel channel = connection.createChannel(); //4.创建交换机 String exchangeName = "my_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //5.创建队列 String queue1 = "topic_queue1"; String queue2 = "topic_queue2"; channel.queueDeclare(queue1,true,false,false,null); channel.queueDeclare(queue2,true,false,false,null); //6.绑定队列和交换机 channel.queueBind(queue1,exchangeName,"*.psjj.*"); channel.queueBind(queue2,exchangeName,"#.psjj.#"); //7.发送消息 String body = "``topic消息``"; //channel.basicPublish(exchangeName,"top.psjj.java",null,body.getBytes()); channel.basicPublish(exchangeName,"www.top.psjj.java",null,body.getBytes()); //8.关闭资源 channel.close(); connection.close(); } }
消费者代码:
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 通配符模式:消息消费者 */ public class ConsumerTopic1 { public static void main(String[] args) throws Exception { //1.创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.创建消费者对象 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息内容:"+new String(body)); } }; //5.接收消息 channel.basicConsume("topic_queue1",true,consumer); } }
package top.psjj.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * 通配符模式:消息消费者 */ public class ConsumerTopic2 { public static void main(String[] args) throws Exception { //1.创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.15.142"); factory.setPort(5672); factory.setVirtualHost("/psjjmq"); factory.setUsername("psjj"); factory.setPassword("123456"); //2.创建连接 Connection connection = factory.newConnection(); //3.创建通道 Channel channel = connection.createChannel(); //4.创建消费者对象 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息内容:"+new String(body)); } }; //5.接收消息 channel.basicConsume("topic_queue2",true,consumer); } }
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
使用Spring整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:
消费者整合
生产者整合过程如下:
【1】创建子模块spring-rabbitmq-producer,并添加如下依赖
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.15.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.15.RELEASE</version> </dependency> </dependencies>
【2】配置rabbitmq.properties的基本信息
rabbitmq.host=192.168.15.142
rabbitmq.port=5672
rabbitmq.username=psjj
rabbitmq.password=123456
rabbitmq.virtual-host=/psjjmq
【3】整合Spring文件spring-rabbitmq-producer.xml,内容如下
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties" /> <!--注入connectionFactory--> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" virtual-host="${rabbitmq.virtual-host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory" /> <!--定义简单类型队列,不绑定交换机则绑定默认交换机,默认交换机类型为direct,名字为"",路由建为队列名称 id:bean的名称 name:queue名称 auto-declare:自动创建 auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列 exclusive:是否独占 durable:是否持久化 --> <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true" /> <!--定义广播模式消息队列:所有队列都能收到消息--> <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true" /> <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true" /> <!--定义广播类型交换机:并绑定上述两个队列--> <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1" /> <rabbit:binding queue="spring_fanout_queue_2" /> </rabbit:bindings> </rabbit:fanout-exchange> <!--定义路由模式消息队列--> <rabbit:queue id="spring_direct_queue_1" name="spring_direct_queue_1" auto-declare="true" /> <rabbit:queue id="spring_direct_queue_2" name="spring_direct_queue_2" auto-declare="true" /> <!--定义路由模式交换机:并绑定上述两个队列--> <rabbit:direct-exchange id="spring_direct_exchange" name="spring_direct_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_direct_queue_1" key="info" /> <rabbit:binding queue="spring_direct_queue_2" key="info" /> <rabbit:binding queue="spring_direct_queue_2" key="error" /> </rabbit:bindings> </rabbit:direct-exchange> <!--定义通配符模式消息队列--> <rabbit:queue id="spring_topic_queue_1" name="spring_topic_queue_1" auto-declare="true" /> <rabbit:queue id="spring_topic_queue_2" name="spring_topic_queue_2" auto-declare="true" /> <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="*.psjj.*" queue="spring_topic_queue_1" /> <rabbit:binding pattern="#.psjj.#" queue="spring_topic_queue_2" /> </rabbit:bindings> </rabbit:topic-exchange> <!--注入rabbitMQ对象--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" /> </beans>
【4】编写测试类
package top.psjj.producer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * spring整合RabbitMQ生产者测试 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test //简单模式 public void helloWorld(){ rabbitTemplate.convertAndSend("spring_queue","hello world"); } @Test//广播模式 public void fanoutTest(){ rabbitTemplate.convertAndSend("spring_fanout_exchange","","广播消息"); } @Test//路由模式 public void directTest(){ rabbitTemplate.convertAndSend("spring_direct_exchange","info","路由消息"); } @Test//通配符模式 public void topicTest(){ rabbitTemplate.convertAndSend("spring_topic_exchange","top.psjj.java","通配符消息"); } }
消费者整合过程如下:
【1】创建子模块spring-rabbitmq-consumer,并添加如下依赖
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.15.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.15.RELEASE</version> </dependency> </dependencies>
【2】配置rabbitmq.properties的基本信息
rabbitmq.host=192.168.15.142
rabbitmq.port=5672
rabbitmq.username=psjj
rabbitmq.password=123456
rabbitmq.virtual-host=/psjjmq
【3】整合Spring文件spring-rabbitmq-consumer.xml,内容如下
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties" /> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--注入队列监听器--> <bean id="springQueueListener" class="top.psjj.consumer.SpringSimpleListener"/> <!--配置监听器容器以及队列相关信息--> <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <rabbit:listener ref="springQueueListener" queue-names="spring_queue" /> </rabbit:listener-container> </beans>
【4】编写监听器
package top.psjj.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * @Auther: 胖叔讲java * @Date: 2023/2/17 - 02 - 17 - 1:08 * @Decsription: top.psjj.consumer * @version: 1.0 */ public class SpringSimpleListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("简单模式:"+new String(message.getBody())); } }
【5】编写测试
package top.psjj.consumer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test1(){ boolean flag = true; while (true){ } } }
注意:由于消费端代码都一样这里这写一个。
使用SpringBoot整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:
消费者整合
生产者整合过程如下:
【1】创建SpringBoot父工程springboot-rabbitmq-day01,pom文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <modules> <module>springboot-rabbitmq-procuder</module> <module>springboot-rabbitmq-consumer</module> </modules> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>top.psjj</groupId> <artifactId>springboot-rabbitmq-day01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq-day01</name> <description>Demo project for Spring Boot</description> <packaging>pom</packaging> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
【2】创建子模块生产者模块springboot-rabbitmq-procuder,添加如下依赖
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
【3】配置连接信息
# 配置Rabbitmq的基本信息
spring:
rabbitmq:
host: 192.168.15.142
port: 5672
username: psjj
password: 123456
virtual-host: /psjjmq
【4】编写配置类
/** * rabbitmq配置类 */ @Configuration public class RabbitMQConfig { //定义交换机名字 public static final String EXCHANGE_NAME = "boot_topic_exchange"; //定义队列名字 public static final String QUEUE_NAME = "boot_topic_queue1"; //注入交换机 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //注入队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } /** * 绑定队列与交换机关系 */ @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
【5】编写测试
/**
* 消息生产者测试
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.java","~~~springboot-topic-消息~~~");
}
}
消费者整合过程如下:
【1】创建子模块生产者模块springboot-rabbitmq-consumer,添加如下依赖
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
【2】配置连接信息
# 配置Rabbitmq的基本信息
spring:
rabbitmq:
host: 192.168.15.142
port: 5672
username: psjj
password: 123456
virtual-host: /psjjmq
【3】编写配置类
package top.psjj.consumer.mq.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component public class RabbitMQListener { @RabbitListener(queues = "boot_topic_queue1") public void ListenerQueue(Message message){ System.out.println(new String(message.getBody())); } }
【4】编写启动器
/**
* springBoot启动器
*/
@SpringBootApplication
public class RabbitMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumerApplication.class,args);
}
}
使用SpringBoot整合RabbitMQ可以大大简化开发步骤,生产者整合过程如下:
消费者整合
创建消费者工程
添加场景启动器
配置连接信息
编写监听器
答:消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件。异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。
答:生产者工作流程如下:
消费者工作流程如下:
consumer先连接到Broker,建立连接Connection,开启一个信道(Channel)。
向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。
等待Broker回应并投递相应队列中的消息,接收消息。
消费者确认收到的消息,ack。
RabbitMq从队列中删除已经确定的消息。
关闭信道。
关闭连接。
如何自定义消息中间件?
答:使用 BlockingQueue(阻塞队列)实现;当队列容器已满时,生产者线程被阻塞,直到队列未满后才可以继续put;当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续take。
当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka这三款。
作为消息队列,要具备以下几个特性:
**RabbitMQ:**RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。
优点:
缺点:
RocketMQ
优点
缺点:跟周边系统的整合和兼容不是很好。
Kafka
Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。
跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka。
Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。
Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。
如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。
但是由于是异步的和批处理的,延迟也会高,不适合电商场景。
消息队列的优缺点?
答:优势有:
1. 应用解耦:提高系统容错性和可维护性
2. 异步提速:提升用户体验和系统吞吐量
3. 削峰填谷:提高系统稳定性
劣势有:
系统的可用性降低:系统引入的依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ的高可用。
系统复杂性提高:MQ的加入增加了系统的复杂度,以前系统是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
一致性问题:A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?
JMS规范和AMQP协议
答:JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。
答:1.simple模式(即最简单的收发模式):消息产生消息,将消息放入队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IoNWDhgB-1692091646688)(./assets/25.png)]
消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
2.work工作模式(资源的竞争)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sKXpbuEg-1692091646689)(./assets/28.png)]
消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
3.publish/subscribe发布订阅(共享资源)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3agb1kbG-1692091646690)(./assets/29.png)]
每个消费者监听自己的队列;
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
4.routing路由模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7cVriv6-1692091646690)(./assets/30.png)]
消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
根据业务功能定义路由字符串
从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
5.topic 主题模式(路由模式的一种)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1tuwbnxn-1692091646691)(./assets/31.png)]
星号井号代表通配符
*匹配一个 #匹配0个或者多个
路由功能添加模糊匹配
消息产生者产生消息,把消息交给交换机
交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)
8.Connection 和Channel关系?
答:生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
9.什么不直接使用TCP连接,而是使用信道?
答:RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。当信道本身的流量很大时,一个Connection就会产生性能瓶颈,流量被限制。需要建立多个Connection分摊信道。信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
10.说说Broker服务节点、Queue队列、Exchange交换器?
答:Broker可以看做RabbitMQ的服务节点。一般请下一个Broker可以看做一个RabbitMQ服务器。Queue:RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
答:生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
答:通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,这样RabbitMq就知道如何正确路由消息到队列了。
答:主要有以下4种。
答:消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种:
答:若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。
答:每一个RabbitMQ服务器都能创建虚拟的消息服务器,也叫虚拟主机(virtual host),简称vhost。默认为“/”。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。