当前位置:   article > 正文

Kafka中怎么保证消息不会丢失和不重复消费?_kafka保证消息不丢失 不重复消费

kafka保证消息不丢失 不重复消费

从两个方面分析:消息推送和消息消费。也就是生产者和消费者两方面。

首先我们需要知道topic的概念。

  • Topic : 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic
  •  一个topic又有多个partition,而每个分区都有若干个副本:一个 leader 和若干个 follower。

生产者在推送消息时,会确定topic和topic中的那个partition。

一个消费者组内每个消费者负责消费 一个topic中不同分区的数据,同一个分区同时只能由一个组内消费者消费

一、生产者推送消息时怎么保证消息不丢失和不重复


对于生产者,在推送消息的时候,有以下几种方式来确定topic、topic中的partition。

将 producer 发送的数据封装成一个 ProducerRecord 对象。


1.指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
2.没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
3.既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。

1.1、总体概况 


为保证 producer 发送的数据,能可靠的发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后,并等待该分区中全部的follower同步完成,该分区的leader才向 producer 发送 ack(acknowledgement: 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据

而为了处理follower在同步数据时发生故障,导致leader一直等待下去的情况,新增了ISR的机制。

1.1.2、什么是ISR呢?
Leader 维护了一个动态的 in-sync replica set (ISR:同步副本),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower长时间未向leader同步数据,则该 follower 将被踢出 ISR,该时间阈值由replica.lag.time.max.ms参数设定。而如果Leader 发生故障,就会从 ISR 中选举出新的 leader。

1.1.3、ACK机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功,才返回ack。

所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

acks 参数配置:

  • 0: producer 不等待 broker(或者说是leader)的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘的数据就已经返回,当 broker 故障时有可能丢失数据;
  • 1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
  • -1(all) : producer 等待 broker 的 ack, partition 的 leader 和 ISR 里的follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前, leader 发生故障,那么会造成数据重复。(假如ISR中没有follower,就变成了 ack=1 的情况)

1.1.4、三种语义

At Most Once 语义:

  • 将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。
  • 此语义可以保证数据不重复,但是不能保证数据不丢失。

At Least Once 语义:

  • 将服务器的 ACK 级别设置为-1(all),可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。
  • 此语义可以保证数据不丢失,但是不能保证数据不重复。

Exactly Once 语义:

  • At Least Once + 幂等性 = Exactly Once
  • 幂等性:所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。
  • 要启用幂等性,只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可(此时 ack= -1)。 Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。原理:开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时, Broker 只会持久化一条。
  • 但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区、跨会话的 Exactly Once。(也就是说它只解决单次会话、单个分区里的消息重复问题)

1.2、过程总结
总结以上,可以得知生产者在推送消息时,依靠的是ISR、ACK机制、以及三种语义来达到不同情况的消息准确性。

所以总的过程应该是这样的: producer 向指定的 topic和partition发送数据, topic 的每个 partition 收到producer 发送的数据后,(下一步是等待ISR的follower同步完成,这一步会根据ack的参数配置[0,1,-1],确定具体的ack返回时机),该分区的leader向 producer 发送 ack(acknowledgement: 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。

而如果要保证生产者推送到服务器里的消息数据即不重复又不丢失,就要使用Exactly Once语义:将ack参数配置为-1,并开启幂等性(enable.idempotence= true)。

1.3、follower与leader出故障,怎么保证数据的一致性
follower 和 leader 发生故障了,该怎么处理。

  • LEO:(Log End Offset)每个副本的最后一个offset;
  • HW:(High Watermark)高水位,指的是消费者能见到的最大的 offset, ISR 队列中最小的 LEO;

follower 故障和 leader 故障:

  • follower 故障:follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • leader 故障:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。

注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。ack是负责数据丢失的

二、消费者丢失消息和重复消费消息的情况


consumer 采用 pull(拉) 模式从 broker 中读取数据。这个过程只涉及到了服务器和消费者两方,那消费者是怎么保证不丢失和不重复的获取消息呢?

关键在于consumer会维护一个offset,该offset实时记录着自己消费的位置。同时消费者能见到的最大的 offset,是HW, 是ISR 队列中最小的 LEO【这一点看1.3】,所以只要保证offset不出错,那消息就不会丢失或者重复消费。但是offset的维护并不是那么简单,它分为好几种方式。

offset的维护方式:

自动提交

enable.auto.commit:是否开启自动提交 offset 功能,消费者只在启动的时候去访问offset的值,如果将该值配置为false,就要手动提交offset,否则offset就不会更新。
auto.commit.interval.ms:自动提交 offset 的时间间隔
手动提交

commitSync(同步提交)
commitAsync(异步提交)
两者的相同点是:都会将本次 poll 的一批数据最高的偏移量提交;
不同点是:commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
自定义存储offset
offset 的维护是相当繁琐的, 因为需要考虑到很多东西,例如消费者的 Rebalace。

三、总结

在业务的运用中。

对于消息重复,这个影响不是很严重,无论是生产者重复推送数据,还是消费者重复拉取数据,只要在消费端落库时,手动做去重就可以了。

对于消息丢失:

  • consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,可以采用手动提交offset:(1)enable.auto.commit=false 关闭自动提交位移、(2)在消息被完整处理之后再手动提交位移
  • 生产者丢失消息是最复杂的情形了。生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,但是一般不推荐这么做!可以采用为其添加回调函数的形式。这个回调函数会在 Producer 收到 ack 时调用,此处就和acks参数配置[1、0、-1]密切相关了
  • 如果消息发送失败的话,我们检查失败的原因之后重新发送即可!另外这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次,你3次一下子就重试完了。

————————————————
版权声明:本文为CSDN博主「努力的布布」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_37200262/article/details/125267714

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/485449
推荐阅读
相关标签
  

闽ICP备14008679号