赞
踩
1. 同步变异步
2. 解耦
3. 流量削峰(类似大坝)
如:支付服务流程,支付服务流程本身不需要关心订单服务、仓储服务、短信服务是否完成,只需关心用户是否支付,但因为整个流程的后面三项服务需要支付服务提供的数据,所以没有消息中间件以前需要支付完成之后,再接着同步调用后续流程,耗时长。有了消息中间件之后,后续流程就无需同步执行了,只需让支付服务发送消息到中间件,后续服务订阅中间件消息即可,同时也解决了级联失败问题,后续流程失败,不影响支付服务的返回。下图解:
使用消息中间件前:
同步调用:支付服务——>订单服务——>仓储服务——>短信服务——>完成,总时长500ms
使用消息中间件后:
异步调用:支付服务——>Broker——>完成,总时长60ms,之后的订单服务、仓储服务、短信服务只需要等待订阅的消息中间件通知消费即可。
ActiveMQ现在已经很少使用,社区不太活跃,放弃(但是使用很简单- -)。
RabbitMQ并发能力强、消息延时低、高可用、管理界面丰富,并且最重要的是:社区非常活跃,出现BUG都能及时解决。
Kafka和RocketMQ的特点都是高吞吐量,但是kafka消息可靠性比较一般,而且消息不保证有序性。RocketMQ弥补了Kafka的缺点,不过是阿里开源,社区不太活跃,文档也不够丰富。
# 通过Docker安装RabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=root \ # 修改默认用户名
-e RABBITMQ_DEFAULT_PASS=root \ # 修改默认密码
--name mq1 \
--hostname my-rabbit \ # 集区部署需要
-p 15672:15672 \ # 图形化界面端口号(通过该端口访问网页,查看数据情况)
-p 5672:5672 \ # 订阅消息端口号
-d \
rabbitmq:latest
注意!注意!注意!最新版RabbitMQ默认是禁用web界面管理插件的,需要手动开启插件,开启方式如下:
步骤一:进入RabbitMQ容器内部
docker exec -it mq1 /bin/bash
步骤二:开启web界面管理插件
rabbitmq-plugins enable rabbitmq_management:开启web界面管理插件
PS:rabbitmq-plugins list:命令可列出插件的启用和禁用状态
-- 消息的六种模型(使用方式)
RabbitMQ Tutorials — RabbitMQhttps://www.rabbitmq.com/getstarted.html
AMQP:一种高级消息队列协议。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
1. 导入依赖
- <!-- AMQP依赖,包含RabbitMQ -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2. 配置文件
- spring:
- application:
- name: spirng-boot-rabbitmq
- rabbitmq:
- host: 192.168.0.86 # RabbitMQ所在主机地址
- port: 5672 # 端口
- virtual-host: / # 虚拟机地址,对应root用户
- username: guest # 用户名
- password: guest # 密码,一定不要是数字,容易报错Socket ***
3. 注入RabbitTemplate实例
- @Autowired
- RabbitTemplate rabbitTemplate;
4. 发送消息,调用RabbitTemplate.convertAndSend方法
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- public void testSendMessageWork(){
- String queue = "simple.queue"; //队列名
- String message = "Hello RabbitMQ!!!"; //发送的消息
- rabbitTemplate.convertAndSend(queue,message); //解析并发送消息
- }
-
ps:发送消息需要通过通道发送,使用SpringAMQP后无需关心通道创建,Spring会帮助创建通道,只需关心队列即可。
1. 导入依赖
- <!-- AMQP依赖,包含RabbitMQ -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2. 配置文件
- spring:
- application:
- name: spirng-boot-rabbitmq
- rabbitmq:
- host: 192.168.0.86 # RabbitMQ所在主机地址
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: guest # 用户名
- password: guest # 密码,一定不要是数字,容易报错Socket ***
3. 方法通过@RabbitListener注解监听队列,方法参数就是消息
- /**
- * @RabbitListener(queues = "simple.queue")
- * queues:监听的队列
- * @param message:监听到的消息
- */
- @RabbitListener(queues = "simple.queue")
- public void getMessage(String message){
- System.out.println("监听消息中间件消息为:" + message);
- }
【多个消费者绑定一个队列,多消费者抢夺队列消息】
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
能者多劳:RabbitMQ的消息预取机制导致两个消费者拿到的消息是一样的,不管有多少条消息,都会平均分摊,没有去考虑消费者性能分发消息。所以为了解决该问题,需要在配置文件中配置以下配置:
- spring:
- application:
- name: spirng-boot-rabbitmq
- rabbitmq:
- host: 192.168.0.86 # RabbitMQ所在主机地址
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: root # 用户名
- password: 0722 # 密码
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
-
- #prefetch:修改默认RabbitMQ消息预取机制
该模式相较于基础版区别:提高队列消息处理速度,避免消息堆积。
exchange:交换机;
作用:转发消息到队列【将原本的一个消息(根据类型)分发给多个队列】
发布订阅模式中生产者只需要知道交换机是谁即可。
特点:Fanout类型交换机会将交换机接受到的消息分发给每一个跟其绑定的队列中去。
1. 前面与基本消息模型一样,后续编码有所变化
2. 创建配置类,配置队列绑定交换机。
Queue(队列)、FanoutExchange(广播交换机)、Binding(队列绑定交换机)
- package cn.itcast.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @Autor 白春恒
- *
- * FanoutExchange:广播类型交换机消息模型
- * 该类型会将交换机接受到的消息分发给每一个跟其绑定的队列中去。
- *
- * PS:这种声明交换机与队列方式比较繁琐,简洁方式看路由模式代码
- */
- @Configuration
- public class FanoutConfiguration {
- /**
- * 声明交换机:fanout1
- * @return 注入到SpringIOC容器,交给Spring进行管理
- */
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("fanout1");
- }
-
- /**
- * 声明队列:fanout.queue1
- * @return
- */
- @Bean
- public Queue queue1(){
- return new Queue("fanout.queue1");
- }
-
- /**
- * 绑定queue1到fanoutExchange交换机
- * @param queue1 队列1
- * @param fanoutExchange 广播类型交换机
- * @return
- */
- @Bean
- public Binding bindingQueue1(Queue queue1,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queue1).to(fanoutExchange);
- }
-
- /**
- * 声明队列:fanout.queue2
- * @return
- */
- @Bean
- public Queue queue2(){
- return new Queue("fanout.queue2");
- }
-
- /**
- * 绑定queue2到fanoutExchange交换机
- * @param queue2 队列2
- * @param fanoutExchange 广播类型交换机
- * @return
- */
- @Bean
- public Binding bindingQueue2(Queue queue2,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queue2).to(fanoutExchange);
- }
- }
3. 监听消息
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String message) {
- System.out.println("监听fanout.queue1得到消息为:" + message);
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String message) {
- System.out.println("监听fanout.queue2得到消息为:" + message);
- }
1. 前面与基本消息模型一样,后续编码有所变化
2. 发送消息到交换机(FanoutExchange)
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * FanoutExchange方式发送消息
- * 先将消息返送到FanoutExchange,再通过交换机将“消息”路由到与其绑定的队列
- */
- @Test
- public void testSendMessageToFanout() {
- String exchange = "fanout1"; //Fanout交换机
- String message = "Hello every one"; //发送的消息
- rabbitTemplate.convertAndSend(exchange,"",message);
- }
广播模式:会将Exchange交换机的消息分发给所有与之绑定的Queue队列;
路由模式:Exchange与Queue之间绑定多了BindingKey,Exchange分发消息时会带上RoutingKey,Queue的BindingKey与RoutingKey保持一致才会收到消息。
1. 前面与基本消息模型一样,后续编码有所变化
2. 声明队列、交换机与RoutingKey,并监听队列
- /**
- * @RabbitListener注解参数:
- * bindings:绑定队列与交换机
- * BindingKey:队列与交换机之间的绑定值,交换机通过该值分发消息到指定队列
- * @param message 监听到的消息
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),//声明队列
- exchange = @Exchange(name = "direct1",type = ExchangeTypes.DIRECT),//声明交换机
- key = {"blue","yellow"}//BindingKey
- ))
- public void listenDirectQueue1(String message){
- System.out.println("路由模式DirectQueue1监听到的消息为:" + message);
- }
1. 前面与基本消息模型一样,后续编码有所变化
2. 发送消息到DirectExchange(路由模式)
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * DirectExchange方式发送消息
- * 先将消息返送到FanoutExchange,再通过交换机和RoutingKey两个值确认将“消息”分发到与其绑定的队列
- *
- * RoutingKey:对应队列绑定的BindingKey,两值匹配则会发消息到匹配的队列中。
- */
- @Test
- public void testSendMessageToDirect() {
- String exchange = "direct1"; //Fanout交换机
- String message = "Hello yellow"; //发送的消息
- //RoutingKey有值表示发送到指定RoutingKey绑定的队列
- rabbitTemplate.convertAndSend(exchange,"yellow",message);
- }
DirectExchange:路由模式只能监听到指定的RoutingKey
TopicExchange:通配符模式可以监听到被通配到的所有RoutingKey,简化了监听多个RoutingKey的繁琐
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic1.queue1"),
- exchange = @Exchange(name = "topic1",type = ExchangeTypes.TOPIC),
- key = {"china.#"}
- ))
- public void listenTopicQueue1(String message){
- System.out.println("通配符模式TopicQueue1监听到的消息为:" + message);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic1.queue2"),
- exchange = @Exchange(name = "topic1",type = ExchangeTypes.TOPIC),
- key = {"#.news"}
- ))
- public void listenTopicQueue2(String message){
- System.out.println("通配符模式TopicQueue2监听到的消息为:" + message);
- }
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessageToTopic() {
- String exchange = "topic1"; //Fanout交换机
- String message = "新闻:恭喜发财,身体健康"; //发送的消息
- rabbitTemplate.convertAndSend(exchange,"china.news",message);
- }
1. 添加jackson依赖
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
2. 注入Jackson2JsonMessageConverter到Spring容器中
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
3. 发送消息即可
- @Test
- public void sendMessageToObjectQueue(){
- Map<String,Object> map = new HashMap<String,Object>();
- map.put("name","高圆圆");
- map.put("age",18);
- rabbitTemplate.convertAndSend("object.queue",map);
- }
1. 添加jackson依赖
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
2. 注入Jackson2JsonMessageConverter到Spring容器中
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
3. 接受消息即可
- @RabbitListener(queues = "object.queue")
- public void getObjectQueueMessage(Map<String,Object> map){
- System.out.println("object.queue队列接收到的消息为:" + map);
- }
PS:
1. RabbitMQ的web管理界面进入交换机报错500解决
2. RabbitMQ用户名密码建议设置为字符串,不要全是数字,否则报错连接不上
3. 最新版RabbitMQ默认是禁用web界面管理插件的,需要手动开启插件
步骤一:进入RabbitMQ容器内部
docker exec -it mq1 /bin/bash
步骤二:开启web界面管理插件
rabbitmq-plugins enable rabbitmq_management:开启web界面管理插件
PS:rabbitmq-plugins list:命令可列出插件的启用和禁用状态
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。