赞
踩
mq: rabbitmq, rocketmq, kafka
RocketMQ是如何最大限度的保证消息不丢失
Producer:
默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
Broker
消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
cunmser
Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的
默认 Kafka 提供 「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset 的方式提供 「at most once」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。
导致 Producer 端消息没有发送成功有以下原因:
网络原因:由于网络抖动导致数据根本就没发送到 Broker 端。
数据原因:消息体太大超出 Broker 承受范围而导致 Broker 拒收消息。
解决问题:
1.request.required.acks:
可靠性最高
, 但也不能保证不丢数据,比如当 ISR 中只剩下 Leader Partition 了, 这样就变成 acks = 1 的情况了。生成者消息确认机制
2.弃用调用发后即焚的方式,使用带回调通知函数的方法进行发送消息: Producer.send(msg, callback)生产者消息同步投递
3.重试次数 retries: Producer 端发送消息的重试次数, 设置为大于0的数
4.重试时间 retry.backoff.ms: 推荐设置为300ms。
Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘: 同步刷盘和异步刷盘
kafka 通过「多 Partition (分区)多 Replica(副本)机制」已经可以最大限度
的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。
多分区多副本:设置参数:
unclean.leader.election.enable:false
replication.factor >=3
min.insync.replicas > 1
replication.factor = min.insync.replicas +1
拉取数据、业务逻辑处理、提交消费 Offset 位移信息。
enable.auto.commit = false, 采用手动提交位移的方式。设置为手动提交不断去尝试
对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。
channel.confirmSelect();// 开启发送方确认模式
然后异步监听确认和未确认的消息:
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到消息");
//做一些其他处理
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});
这样就可以让生产端感知到消息是否投递到RabbitMQ中了
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。
所有需要给exchange、queue和message都进行持久化:
//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
消息保存到数据库中
status=0: 表示生产端将消息发送给了RabbitMQ但还没收到确认。
status=1: 表示RabbitMQ已收到消息。
生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性)。
自动ack机制改为手动ack机制。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收到消息,做处理
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息
}
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
autoAck参数置为false, 那么RabbitMQ服务端的队列分为两部分: 1.等待投递给消费端的消息 2.已经投递给消费端。如果RabbitMQ一直没有收到消费端的确认信号, RabbitMQ会安排该消息重新进入队列(放在队列头部)等待投递给下一个消费者。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。