赞
踩
目录
1 安装RabbitMQ(在docker里安装 docker的快速入门)
利用SpringAMQP实现HelloWorld中的基础消息队列功能
三:发布、订阅模型-Fanout 广播(Exchange、Queue、Binding)
利用SpringAMQP演示FanoutExchange的使用
利用SpringAMQP演示DirectExchange的使用
步骤1:在consumer服务声明Exchange、Queue
步骤2:在publisher服务发送消息到DirectExchange
利用SpringAMQP演示TopicExchange的使用
步骤2:在publisher服务发送消息到TopicExchange
MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
1 优点:
时效性较强,可以立即得到结果
2 微服务间基于Feign的调用就属于同步方式
3 缺点:
耦合度高:每次加入新的需求,都要修改原来的代码
性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的 时间之和。
资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场 景下会极度浪费系统资
级联失败: 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一 样,迅速导致整个微服务群故障
1 异步调用常见实现就是事件驱动模式
2 异步通信的事件驱动的优点:
耦合度低:服务提供者发布事件到MQ服务器,服务消费者通过订阅事件从MQ服务器获 取事件,新增消费服务增加订阅即可。
吞吐量提升 :用户调用服务只是调用了服务提供者的业务,然后由服务提供者发布事件 通知服务消费者即可,没有链级调用服务消费者。所以服务响应时间缩短
故障隔离:服务提供者和各个服务消费者的业务逻辑是隔离的 互不影响
流量削峰:服务提供者发布事件到MQ服务器中,MQ服务器通过事件驱动的架构Broker 来存储事件,然后先进先出的原则把事件通知服务消费者处理(服务消费者处 理完一个事件再处理下一个事件,这样就可以将高并发的请求降低 做到平缓 处理)
3 异步通信的缺点:
依赖于Broker的可靠性、安全性、吞吐能力
架构复杂了,业务没有明显的流程线,不好追踪管理
1单机部署
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:从本地加载
提前准备镜像包
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
1.2.安装MQ
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \ 环境变量 MQ管理平台的账号
-e RABBITMQ_DEFAULT_PASS=123321 \ 环境变量 MQ管理平台的密码
--name mq \ 给容器取一个名字
--hostname mq1 \ 主机名 可不配 但如果要做集群必配
-p 15672:15672 \ MQ的管理平台的端口
-p 5672:5672 \ MQ发布消息和订阅消息的端口
-d rabbitmq:3-management -d后台运行 rabbitmq:3-management:镜像名称
控制台访问地址:部署MQ服务的ip:15672
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
2.1 基本消息队列 HelloWorld案例
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息
3.0 引入依赖
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <!--单元测试-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
3.1 基本消息队列的消息发送流程:
建立connection
创建channel
利用channel声明队列
利用channel向队列发送消息
- @Test
- public void testSendMessage() throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.150.101");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("itcast");
- factory.setPassword("123321");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
- // 4.发送消息
- String message = "hello, rabbitmq!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
- // 5.关闭通道和连接
- channel.close();
- connection.close();
- }
3.2 基本消息队列的消息接收流程:
建立connection
创建channel
利用channel声明队列
定义consumer的消费行为handleDelivery()
利用channel将消费者与队列绑定
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.150.101");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("itcast");
- factory.setPassword("123321");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
- // 4.订阅消息
- channel.basicConsume(queueName, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 5.处理消息
- String message = new String(body);
- System.out.println("接收到消息:【" + message + "】");
- }
- });
- System.out.println("等待接收消息。。。。");
- }
打印顺序:
等待接收消息。。。。
hello, rabbitmq!
什么是AMQP?
应用间消息通信的一种协议,与语言和平台无关。
SpringAMQP如何发送消息?
引入amqp的starter依赖 配置RabbitMQ地址
利用RabbitTemplate的convertAndSend方法
基本消息队列的消息发送流程
@Autowired
private RabbitTemplate rabbitTemplate;步骤1:引入AMQP依
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>步骤2:在publisher中编写测试方法,向simple.queue发送消息
2.1在publisher服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码2.2 在publisher服务中新建一个测试类,编写测试方法:
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
基本消息队列的消息接收流程 @RabbitListener(queues = "simple.queue")
SpringAMQP如何接收消息?
引入amqp的starter依赖
配置RabbitMQ地址
定义类,添加@Component注解
类中声明方法,添加@RabbitListener注解,方法参数就是消息
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
步骤1:引入AMQP依赖
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>步骤2:在consumer中编写消费逻辑,监听simple.queue
2.1 在consumer服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: /2.2 在consumer服务中新建一个类,编写消费逻辑:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者接收到消息:【" + msg + "】" + LocalTime.now()); } }
1 Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
2 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
3 通过设置prefetch来控制消费者预取的消息数量
模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
1 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
2 在consumer服务中定义两个消息监听者,都监听simple.queue队列
3 消费者1每秒处理50条消息,消费者2每秒处理10条消息
步骤1:生产者循环发送消息到simple.queue
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessage2WorkQueue() throws InterruptedException {
- String queueName = "simple.queue";
- String message = "hello, message__";
- for (int i = 1; i <= 50; i++) {
- rabbitTemplate.convertAndSend(queueName, message + i);
- Thread.sleep(20);
- }
- }
步骤2:编写两个消费者,都监听simple.queue
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "simple.queue")
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "simple.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
- Thread.sleep(200);
- }
- }
步骤3:消费预取限制
因为消费预取数量默认是不限制 因为发送50条事件 所以两个消费服务分别分的25条事件 消费者1处理完事件用时25*20 ms 消费者处理完事件用时25*200 ms
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
- spring:
- rabbitmq:
- host: 192.168.150.101 # rabbitMQ的ip地址
- port: 5672 # 端口
- username: itcast
- password: 123321
- virtual-host: /
- listener:
- simple:
- prefetch: 1 #处理一条事件取一条事件
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:
Fanout:广播
Direct:路由
Topic:话题
实现思路如下:
1 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
3 在publisher中编写测试方法,向itcast.fanout发送消息
步骤1:在consumer服务(消费)声明Exchange、Queue、Binding
在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
- @Configuration
- public class FanoutConfig {
- // itcast.fanout 声明FanoutExchange交换机
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("itcast.fanout");
- }
- // fanout.queue1 声明第1个队列
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("fanout.queue1");
- }
- // 绑定队列1到交换机
- @Bean
- public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
- return BindingBuilder
- .bind(fanoutQueue1)
- .to(fanoutExchange);
- }
-
- //... 略, 以相同方式声明第2个队列,并完成绑定
- }
步骤2:在consumer服务声明两个消费者
在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String msg) {
- System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
- }
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String msg) {
- System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
- }
- }
步骤3:在publisher服务发送消息到FanoutExchange
在publisher服务的SpringAmqpTest类中添加测试方法:
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendFanoutExchange() {
- // 交换机名称
- String exchangeName = "itcast.fanout";
- // 消息
- String message = "hello, every one!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "", message);
- }
总结:
交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
Queue
FanoutExchange
Binding
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
1 每一个Queue都与Exchange设置一个BindingKey
2 发布者发送消息时,指定消息的RoutingKey
3 Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现思路如下:
1 利用@RabbitListener声明Exchange、Queue、RoutingKey
2 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3 在publisher中编写测试方法,向itcast. direct发送消息
1 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
2 并利用@RabbitListener声明Exchange、Queue、RoutingKey
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
- key = {"blue", "yellow"}
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
- }
在publisher服务的SpringAmqpTest类中添加测试方法:
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "itcast.direct";
- // 消息
- String message = "hello, red!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "red", message);
- }
结果:消费者1接收到direct.queue1的消息:【hello, red!】
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "itcast.direct";
- // 消息
- String message = "hello, blue!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "blue", message);
- }
结果:
消费者1接收到direct.queue1的消息:【hello, blue!】
消费者2接收到direct.queue2的消息:【hello, blue!】
总结:
描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
实现思路如下:
1 并利用@RabbitListener声明Exchange、Queue、RoutingKey
2 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3 在publisher中编写测试方法,向itcast. topic发送消息
步骤1:在consumer服务声明Exchange、Queue
1 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
2 并利用@RabbitListener声明Exchange、Queue、RoutingKey
- @Component
- public class SpringRabbitListener {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
- key = "china.#"
- ))
- public void listenTopicQueue1(String msg){
- System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"),
- exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
- key = "#.news"
- ))
- public void listenTopicQueue2(String msg){
- System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
- }
-
-
- }
在publisher服务的SpringAmqpTest类中添加测试方法:
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendTopicExchange() {
- // 交换机名称
- String exchangeName = "itcast.topic";
- // 消息
- String message = "今天天气不错,我的心情好极了!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
- }
结果:
消费者1和消费者2都能接受到消息
总结:
描述下Direct交换机与Topic交换机的差异?
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
1 我们在publisher或consumer中利用@Bean声明一个队列:
- @Configuration
- public class FanoutConfig {
-
- @Bean
- public Queue objectQueue(){
- return new Queue("object.queue");
- }
- }
2 在publisher中发送消息以测试:
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMap() throws InterruptedException {
- // 准备消息
- Map<String,Object> msg = new HashMap<>();
- msg.put("name", "Jack");
- msg.put("age", 21);
- // 发送消息
- rabbitTemplate.convertAndSend("object.queue", msg);
- }
3 设置消息转换器
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化
步骤如下:
1 我们在publisher服务引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>2 我们在publisher服务声明MessageConverter:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }3 在consumer服务引入Jackson依赖:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>4 在consumer服务定义MessageConverter:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
4 然后定义一个消费者,监听object.queue队列并消费消息:
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "object.queue")
- public void listenObjectQueue(Map<String,Object> msg){
- System.out.println("接收到object.queue的消息:" + msg);
- }
- }
总结:
SpringAMQP中消息的序列化和反序列化是怎么实现的?
利用MessageConverter实现的,默认是JDK的序列化
注意发送方与接收方必须使用相同的MessageConverter
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。