赞
踩
消息可靠性 消息至少被消费一次
延迟消息 消息的延迟发送
高可用:单机容易挂机,避免单点故障
消息堆积 百万级数据堆积,无法及时消费如何处理
简单模式:
工作队列模式:
publish/subscrobe(发布订阅模式):
routing();
topics()
发送时丢失消息没有成功投递到交换机
消息未路由到队列
MQ宕机,queue将消息丢失
消费者获取消息未消费 宕机
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定唯一的ID.消息发送成功后会返回结果给发送者,表示消息是否发送成功
返回结果的两种方式
publisher-confirm:发送者确认
消息成功投递到交换机,返回ack
消息未成功投递到交换机,返回nack
publisher-return, 发送者回执
消息投递到交换机,但是没有路由到队列,返回ack,以及路由失败的原因
** 确认机制发送消息的时候需要给每个消息设置一个全局唯一的id,以区分不同消息,避免ack冲突
MQ宕机导致消息消失:
进行持久化:
交换机,队列,消息都要进行持久化
开启publisher-confirm
在配置文件中配置:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated
- publisher-returns: true
- template:
- mandatory: true
publisher-confirm-type:开启publisher-confirm,支持两种类型
simple:同步等待confirm结果,直到超时
correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallback
publish-returns:开启publish-return功能,同样基于callback机制,不过时定义ReturnCallback
template.mandatory:开启消息路由失败时的策略.turn,则调用ReturnCallback; false:直接丢弃数据
代码实现 publisher-confirm 当发送信息到交换机失败时
配置文件
- spring:
- rabbitmq:
- host: ***.***.**.*** # 主机地址
- port: 5672 # 端口号
- username: itcast #mq账号
- password: 123321 #mq密码
- virtual-host: /
- publisher-confirm-type: correlated #开启 publisher-confirm 设置同步类型
- publisher-returns: true #开启publisher-returns
- template:
- mandatory: true # 开启消息路由失败时的策略.turn,则调用ReturnCallback; false:直接丢弃数据
- redis:
- host: ***.***.**.*** #redis地址
- password: 123456 #连接redis密码
代码
- @GetMapping("/sendMessage3")
- public String sendMessage3() {
- //设置mq中的交换机,队列,以及要发送的消息
- String exchange = "direct.exchange";
- String routingKey = "jj";
- String message = "i love mm";
- //设置全局唯一的id,以区分不同消息,避免ack冲突
- String id = UUID.randomUUID().toString();
-
- //将交换机,队列,消息 信息放入redis中
- Map<String, String> map = new HashMap<>();
- map.put("exchange", exchange);
- map.put("routingKey", routingKey);
- map.put("message", message);
-
- //讲数据设置到redis中
- redisTemplate.opsForHash().putAll(id, map);
- //发送消息到mq中
- rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(id));
-
- return "ok";
- }
-
- @PostConstruct
- public void init() {
- log.info("初始化方法运行");
- //设置回调函数,向mq中发送消息失败的时候由框架自动调用
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- String id = correlationData.getId();
- log.info("id是:id{}" , id);
- log.info("b是: b{}" , b);
- log.info("message s{}" , s);
- if (!b) {
- Map<String,String> map = redisTemplate.opsForHash().entries(id);
- String exchange = map.get("exchange");
- String routingKey = map.get("routingKey");
- String message = map.get("message");
- log.info(exchange+" : "+routingKey+" : "+message);
- } else {
- redisTemplate.delete(id);
- }
-
- }
- });
- }
因为生产者可以确保消息发送到队列中,但是消息发送到MQ以后如果MQ宕机,可能导致消息丢失,所以需要开启消息持久化机制.
交换机持久化
队列持久化
消息持久化
交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启之后会丢失数据.可以通过代码创建交换机时候持久化
- @Bean
- public DirectExchange simpleExchange(){
- // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
- return new DirectExchange("simple.direct", true, false);
- }
在SpringAMQP声明交换的都是持久化的 持久化的交换机在RabbitMQ控制台看会带上D的标识
队列默认也是非持久化的,可以通过代码指定
- @Bean
- public Queue simpleQueue(){
- // 使用QueueBuilder构建队列,durable就是持久化的
- return QueueBuilder.durable("simple.queue").build();
- }
和交换机一样 在RabbitMQ控制台去看的时候也是带上了D
默认情况下SpringAMQP发出的任何消息都是持久化的,不用特意指定,所以消息的持久化我就不写了,累趴.
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- listener:
- simple:
- retry:
- acknowledge-mode: auto # 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- # none #消息投递是不可靠的,可能丢失
- # manual:手动ack,需要在业务代码结束后,调用api发送ack。
当消费者出现异常之后,消息会不断的requeue到队列中,重新发给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力
本地重试
利用Spring的retry机制,当消费者出现异常的时候利用本地重试,而不是无限制的requeue到mq队列
配置文件中加入
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000 # 初识的失败等待时长为1秒
- multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
- //听说这样很优雅,但是没用过
- @Configuration
- public class ErrorMessageConfig {
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
什么是死信交换机?
当一个队列中的消息满足下列情况之一的时候,称为死信
消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue的参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机
MQ集群
挺高可用性,防止单点故障
特点
在那个节点上创建队列,该队列只会在该节点上存在
节点之间会共享部分信息,无法共享数据
搭建流程
规划3个rabbitma
mq1,mq2,mq3(三个主机名称,主机名用来代替ip)
管理控制台端口 8081 8082 8083
通信端口 8071 8072 8073
获取erlang语言的cookie值
进入之前安装的mq容器中查看文件
docker exec -it mq bash
cat /var/lib/rabbitmq/.erlang.cookie
复制上一步获取到的cookie值
创建一个记录cookie的文件.erlang.cookie,所有节点必须一样
创建一个集群配置文件 rabbitmq.conf,该集群配置文件,所有的节点必须一样
创建一docker网络,保证三个mq在同一个网络中
创建三个mq节点,保证挂载同一个集群文件和cookie文件
本质是主从模式
交换机,队列,队列中的信息会在各个mq的镜像节点之间同步备份
创建队列的节点被称为该队列的主节点,备份带其他节点的叫做该队列的镜像节点
一个队列的主节点可能是另外一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主节点宕机之后,镜像节点会替代成新的主
创建
镜像集群存在的问题:镜像集群基于最终一致性,会导致消息丢失
最终一致性:
只要生产者将消息成功发送到主节点的队列 主节点就会返回 ack
接下来主节点才会将消息同步到其他的镜像节点
如果在此过程中主节点宕机将会导致消息丢失
可以采用强一致性来解决,就是在生产者发送消息到主节点的队列中 等到主节点将消息同步到其他的镜像节点中才返回ack
仲裁队列默认会选择五个镜像节点
概念:当生产者发送消息的速度超过了消费者处理的速度,就会导致对立中的消息堆积,知道队列存储消息达到上限,之后发送的消息就会成为死信,可能会被丢弃.
解决思路:
增加更多的消费者,提高消费速度,也就是work/queue模式
使用惰性队列,增加队列存储消息的容量
在消费者内开启线程池加快消息处理速度(线程池也要消耗资源,不推荐)
消息队列中的消息存储在哪里
普通队列
消息存储在内存中
如果开启了持久化,消息除了在内存中储存,超过内存阈值上限消息会持久化到硬盘
惰性队列
惰性队列中的消息存在磁盘上
消费者如果要消费消息,需要从磁盘读取到内存
- @Bean
- public Queue lazyQueue(){
- return QueueBuilder
- .durable("lazy.queue")
- .lazy() // 开启x-queue-mode为lazy
- .build();
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。