赞
踩
首先我们需要知道topic的概念。
生产者在推送消息时,会确定topic和topic中的那个partition。
一个消费者组内每个消费者负责消费 一个topic中不同分区的数据,同一个分区同时只能由一个组内消费者消费
对于生产者,在推送消息的时候,有以下几种方式来确定topic、topic中的partition。
将 producer 发送的数据封装成一个 ProducerRecord 对象。
1.指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
2.没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
3.既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。
1.1、总体概况
为保证 producer 发送的数据,能可靠的发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后,并等待该分区中全部的follower同步完成,该分区的leader才向 producer 发送 ack(acknowledgement: 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。
而为了处理follower在同步数据时发生故障,导致leader一直等待下去的情况,新增了ISR的机制。
1.1.2、什么是ISR呢?
Leader 维护了一个动态的 in-sync replica set (ISR:同步副本),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower长时间未向leader同步数据,则该 follower 将被踢出 ISR,该时间阈值由replica.lag.time.max.ms参数设定。而如果Leader 发生故障,就会从 ISR 中选举出新的 leader。
1.1.3、ACK机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功,才返回ack。
所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks 参数配置:
1.1.4、三种语义
At Most Once 语义:
At Least Once 语义:
Exactly Once 语义:
1.2、过程总结
总结以上,可以得知生产者在推送消息时,依靠的是ISR、ACK机制、以及三种语义来达到不同情况的消息准确性。
所以总的过程应该是这样的: producer 向指定的 topic和partition发送数据, topic 的每个 partition 收到producer 发送的数据后,(下一步是等待ISR的follower同步完成,这一步会根据ack的参数配置[0,1,-1],确定具体的ack返回时机),该分区的leader向 producer 发送 ack(acknowledgement: 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。
而如果要保证生产者推送到服务器里的消息数据即不重复又不丢失,就要使用Exactly Once语义:将ack参数配置为-1,并开启幂等性(enable.idempotence= true)。
1.3、follower与leader出故障,怎么保证数据的一致性
follower 和 leader 发生故障了,该怎么处理。
follower 故障和 leader 故障:
注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。ack是负责数据丢失的
consumer 采用 pull(拉) 模式从 broker 中读取数据。这个过程只涉及到了服务器和消费者两方,那消费者是怎么保证不丢失和不重复的获取消息呢?
关键在于consumer会维护一个offset,该offset实时记录着自己消费的位置。同时消费者能见到的最大的 offset,是HW, 是ISR 队列中最小的 LEO【这一点看1.3】,所以只要保证offset不出错,那消息就不会丢失或者重复消费。但是offset的维护并不是那么简单,它分为好几种方式。
offset的维护方式:
自动提交
enable.auto.commit:是否开启自动提交 offset 功能,消费者只在启动的时候去访问offset的值,如果将该值配置为false,就要手动提交offset,否则offset就不会更新。
auto.commit.interval.ms:自动提交 offset 的时间间隔
手动提交
commitSync(同步提交)
commitAsync(异步提交)
两者的相同点是:都会将本次 poll 的一批数据最高的偏移量提交;
不同点是:commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
自定义存储offset
offset 的维护是相当繁琐的, 因为需要考虑到很多东西,例如消费者的 Rebalace。
在业务的运用中。
对于消息重复,这个影响不是很严重,无论是生产者重复推送数据,还是消费者重复拉取数据,只要在消费端落库时,手动做去重就可以了。
对于消息丢失:
————————————————
版权声明:本文为CSDN博主「努力的布布」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_37200262/article/details/125267714
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。