赞
踩
上一篇blog大致了解了Kafka的基本工作流程和存储机制,其实是从分布式系统均特有的角度去思考的,那么这篇blog就介绍下Kafka的生产者相关策略,其实归根到底也是从支持分布式系统特性【高可扩展、高可用、高并发、海量存储】的角度去考虑的。
Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。
为什么要分区呢?上一篇blog【Kafka从入门到放弃系列 三】Kafka架构深入——工作流程和存储机制已经详细介绍过了,我们再来强调一下:
如上图所示展示的,我们分布式集群的特性才能体现出来,其实不光是Kafka,所有的分布式中间件,都会有这种概念,例如ElasticSearch里也有node节点,索引、分片和复制,就分别对应Kafka的broker,topic、分区和副本一通百通。
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,既然分区了,我们怎么知道生产者的消息该发往哪个分区呢?producer 会根据分区算法选择将其存储到哪一个 partition。
从代码结构里我们可以看到实际上可以归纳为三种方法,也就是三种路由机制,决定消息被发往哪个分区,分别是:
明白了消息是如何被发往分区的之后,我们解了高可扩展和高并发,我们还需要思考明白一个问题,如何保证高可扩展,也就是数据如何可靠传输。
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
request.required.acks = 0,producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。【传输效率最高,可靠性最低】
request.required.acks = 1,这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。【传输效率中,可靠性中】
request.required.acks = -1(all),producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。【传输效率低,可靠性高】,同时如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
当 request.required.acks = -1时需要注意,如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。通过将参数 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,因为如果发送ack后leader宕机,那么此时该条消息就会被丢失,所以应该拒绝客户端的写请求以防止消息丢失。
那么到底多少foller副本同步完成,才发送ack呢?现有的两种方案选择第二种,第一种占用的机器资源过多,造成了大量的数据冗余,而网络延迟对于Kafka的影响并不大。
采用全量副本同步方案后,我们发送ack的时机确定如下:leader 收到数据,所有 follower 都开始同步数据,但是设想如下情况:有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?我们引入ISR的概念
在这种机制下,ISR始终是动态保持稳定的集群,消息来了之后,leader先读取,然后推送到各个follwer里,保证ISR中各个副本处于同步状态,leader挂掉后,立即能从ISR中选举新的leader来处理消息。
在数据可靠性保障策略中我们了解到如何通过分区和副本,以及动态的ISR和ack机制来确保消息的可靠,那么接下来深入探讨下,故障发生的时候,我们如何将集群恢复正常?
在数据可靠性保障策略中我们了解到如何通过分区和副本,以及动态的ISR和ACK机制来确保消息的可靠,那么接下来深入探讨下,故障发生的时候,我们如何将集群恢复正常?首先需要明确两个概念:LEO和HW:
HW和LEO在消息流转的过程如下:
Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性,而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡【同步复制并干掉复制慢的副本】
当不同的机器宕机故障时来看看ISR如何处理集群以及消息,分为 follower 故障和leader故障:
总而言之,要以所有副本都同步好的最新的HW为准。但这只是处理方法,并不能保证数据不重复或者不丢失,我们来看一种数据重复的案例: Leader宕机:考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader。
这样就出现了数据重复的现象,所以HW&LEO机制只能保证副本之间保持同步,并不能保证数据不重复或不丢失,要想都保证,需要结合ACK级别食用
在可能发生的故障中,当Leader挂了的时候我们需要选举新的leader,遵循如下策略:Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能。
当然也有 极端情况:当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:
如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略
在了解了如何确保副本之间保持同步的故障恢复机制以及如何确保数据可靠的ACK机制之后,我们来探讨下如何保证数据传输的幂等。
对于一些非常重要的信息,消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
为了实现跨分区跨会话的事务以及防止PID重启造成的数据重复,需要引入一个Topic全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
本篇blog详细叙述了Kafka的生产者策略,从分区机制到数据可靠性机制,再到故障恢复机制,最后介绍了如何实现消息的Exactly Once语意,感觉Kafka的主要策略还是集中在生产者端的。理解起来比较复杂。不过以调度换资源嘛,通过复杂的调度来节省资源,还是有好处的
部分内容引自https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。