赞
踩
生产者保证不丢消息(消息重试,开启消息确认机制)
存储端不丢消息(持久化存储、同步刷盘和异步刷盘)
消费者不丢消息(消息确认机制手动ack等)
集群部署(主从复制、镜像模式)
消息队列(MQ)系统如 RabbitMQ、Kafka 和 RocketMQ 等在实现消息不丢失的可靠性方面都提供了一些机制,具体如下:
1.存储端不丢消息
消息持久化到硬盘 ,开启 持久化磁盘配置
2.生产者保证不丢消息
transactoin 开启事务
confirm 开启消息确认机制
消息确认机制 -必须确认消息成功刷盘到硬盘中,才能够人为消息投递成功。
3.消费者
必须确认消息消费成功
rabbitmq 中:才会将该消息删除,手动提交
rocketmq 或者 kafka 中:才会提交 offset
存储端
刷盘机制分同步刷盘和异步刷盘
生产者消息发过来时,只有持久化到磁盘,RocketMQ 存储端 Broker 才返回一个成功 ACK 响应,这就同步刷盘。它保证消息不丢失,但影响了性能。
异步刷盘话,只要消息写入 PageCache 缓存,就返回一个成功ACK 响应。这样提高了 MQ 性能,但如果这时候机器断电了,就会丢失消息。Broker 一般集群部署,有 master 主节点和 slave 从节点。消息到Broker 存储端,只有主节点和从节点都写入成功,才反馈成功 ack 给生产者。这就同步复制,它保证了消息不丢失,但降低了系统吞吐量。与之对应就异步复制,只要消息写入主节点成功,就返回成功ack,它速度快,但会有性能问题。
把消息持久化磁盘,保证服务器重启消息不丢失。每个集群中至少有一个物理磁盘,保证消息落入磁盘。
RabbitMQ 通过持久化消息和队列、消息确认机制、消息重试机制、备份队列和镜像队列等方式来确保消息在传输和消费过程中不会丢失,提高了消息队列系统的可靠性和稳定性。
RabbitMQ 是一个流行的开源消息队列系统,为了确保消息的可靠传输,RabbitMQ 采用了以下几种机制:
1.生产者:RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
● 通过事务实现
● 通过发送方确认机制(publisher confirm)实现
1.1事务机制:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。这种方式有个缺点:吞吐量下降;
事务实现
● channel.txSelect(): 将当前信道设置成事务模式
● channel.txCommit(): 用于提交事务
● channel.txRollback(): 用于回滚事务
通过事务实现机制,只有消息成功被rabbitmq服务器接收,事务才能提交成功,否则便可在捕获异常之后进行回滚,然后进行消息重发,但是事务非常影响rabbitmq的性能。还有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息
1.2.confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
confirm方式有三种模式:普通confirm模式、批量confirm模式、异步confirm模式
● channel.confirmSelect(): 将当前信道设置成了confirm模式
普通confirm模式
每发送一条消息,就调用waitForConfirms()方法,等待服务端返回Ack或者nack消息
3.消费者
消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除
那这两种模式要如何选择
● 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便。好处就是可以提高吞吐量,缺点就是会丢失消息
● 如果消息非常重要,不容丢失,则建议手动ACK,正常情况都是更建议使用手动ACK。虽然可以解决消息不会丢失的问题,但是可能会造成消费者过载
注:自动确认模式,消费者不会判断消费者是否成功接收到消息,也就是当我们程序代码有问题,我们的消息都会被自动确认,消息被自动确认了,我们队列就会移除该消息,这就会造成我们的消息丢失
RocketMQ 通过同步复制、刷盘机制、消息重试机制、消息确认机制以及高可用性集群架构等方式,确保消息在传输和消费过程中不会丢失,提高了消息队列系统的可靠性和稳定性。
Consumer端,消费正常后再进行手动ack确认
发送消息如果失败或者超时,则重新发送
RocketMQ 是一个开源的分布式消息队列系统,为了确保消息的可靠性,RocketMQ 采取了以下几种机制:
生产者
生产端如何保证不丢消息呢?确保生产消息能到达存储端。
如果RocketMQ 消息中间件,Producer 生产者提供了三种发送消息方式,分别:同步发送、异步发送、单向发送生产者要想发息时保证消息不丢失,可以:
采用同步方式发送,send 消 息方法返回成功状态,就表示消息息正常到达了存储端Broker。如果 send 消息异常或者返回非成功状态,可以重试。可以使用事务消息,RocketMQ 事务消息机制就为了保证零丢失来设计
Kafka 通过持久性存储、复制机制、ISR 机制、消息复制确认机制和消息复制和同步机制等方式来保证消息在传输和存储过程中不会丢失,提高了消息队列系统的可靠性和稳定性。
服务器端持久化设置为同步刷盘持久化
生产者设置为同步投递(重试)
消费端设置为手动提交
Kafka 通过以下方式来保证消息不丢失:
服务器端持久化设置为同步刷盘
首先第一个是服务器端。设置broker中的配置项unclean.leader.election.enable = false,保证所有副本同步。同时,Producer将消息投递到服务器的时候,我们需要将消息持久化,也就是说会同步到磁盘。注意,同步到硬盘的过程中,会有同步刷盘和异步刷盘。如果选择的是同步刷盘,那是一定会保证消息不丢失的。就算刷盘失败,也可以即时补偿。但如果选择的是异步刷盘的话,这个时候,消息有一定概率会丢失。网上有一种说法,说Kafka不支持同步刷盘,这种说法也不能说是错的。但是可以通过参数的配置变成同步刷盘,比如,这样的配置:
当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
#log.flush.interval.messages=10000
#当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
#log.flush.interval.ms=1000
检查是否需要将日志flush的时间间隔
log.flush.scheduler.interval.ms = 3000
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新
选出一个 leader ,但是leader 的数据还有一些没有被 follower 副本的同步的话,就会造
成消息丢失。 当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时
就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低
了消息丢失的可能性
就是生产者Producer,使用带回调通知的send(msg,callback)方法,并且设置acks = all 。
它的消息投递要采用同步的方式。Producer要保证消息到达服务器,就需要使用到消息确认机制,也就是说,必须要确保消息投递到服务端,并且得到投递成功的响应,确认服务器已接收,才会继续往下执行。那如果,Producer将消息投递到服务器端,服务器来没来得及接收就已经宕机了,那投递过来的消息岂不是丢失了,怎么办呢?大家不要慌,在Producer投递消息时,都会记录日志,然后再将消息投递到服务器端,就算服务器宕机了,等服务器重启之后,也可以根据日志信息完成消息补偿,确保消息不丢失。
生产者丢失消息的情况
生产者(Producer) 调用 send 方法发送消息之后,消息可能因为网络问题并没有发送过去。
为了确定消息是发送成功,我们要判断消息发送的结果,Kafka 生产者(Producer) 使用 send
方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它
变为了同步操作,可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info(“生产者成功发送消息到 topic:{} partition:{}的消息”,
result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error(“生产者发送消失败,原因:{}”, ex.getMessage()));
Producer的retries(重试次数)设置一个比较合理的值,一般是3 ,但是为了保证消息不
丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发
送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显
了,网络波动一次你3次一下子就重试完了
设置enable.auto.commit为false
在Kafka中,消息消费完成之后,它不会立即删除,而是使用定时清除策略,也就是说,我们消费者要确保消费成功之后,手动ACK提交。如果消费失败的情况下,我们要不断地进行重试。所以,消费端不要设置自动提交,一定设置为手动提交才能保证消息不丢失。
消费者丢失消息的情况
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个 问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际 上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手 动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你 刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两 次。
ActiveMQ 是一个流行的开源消息中间件,为了确保消息的可靠传输,ActiveMQ 采用了以下几种机制:
不会,因为我们消息会持久化在我们硬盘中
线上生产环境中运行时,你必须要考虑到消费者服务可能宕机的问题。
实际上无论是RocketMQ、Kafka还是RabbitMQ,都有类似的autoAck或者是手动ack的机制。
首先,我们需要把那个参数从true改为false,如
// 关闭自动ACK,改为手动确认
channel.basicConsume(QUEUE_NAME, false, "myConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//手动进行应答,这时消息队列会删除该消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
只要修改为false之后,RabbitMQ就不会盲目的投递消息到仓储服务,立马就删除消息了,说白了就是关
闭autoAck的行为,不要自作主张的认为消息处理成功了。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。
这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?
这里顺便说一下吧,其实也很容易,就下面两步
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。