赞
踩
kafka需要所有的iSR全部同步完成,则表示同步成功
AR:分区的所有副本
在Kafka中维护了一个AR列表,包括所有的分区的副本。AR又分为ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW这些信息都被保存在Zookeeper中。
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。
在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。
最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
LogEndOffset:分区的最新的数据的offset,当数据写入leader后,LEO就立即执行该最新数据。相当于最新数据标识位。
HighWatermark:只有写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到。相当于所有副本同步数据标识位。
在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。
所以LEO代表已经写入的最新数据位置,而HW表示已经同步完成的数据,只有HW之前的数据才能被外界访问。
如果leader宕机,选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。
当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。
所谓的分区其实就是在kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号,编号从0开始。
所谓的segment其实就是在分区对应的文件夹下产生的文件。
一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另一方面可以基于这些segment文件进行历史数据的删除,提高效率。
一个segment又由一个.log和一个.index文件组成。
.log文件为数据文件用来存放数据分段数据。
.index为索引文件保存对对应的.log文件的索引信息。
在.index文件中,保存了对对应.log文件的索引信息,通过查找.index文件可以获知每个存储在当前segment中的offset在.log文件中的开始位置,而每条日志有其固定格式,保存了包括offset编号、日志长度、key的长度等相关信息,通过这个固定格式中的数据可以确定出当前offset的结束位置,从而对数据进行读取。
这两个文件的命名规则为:
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
开始读取指定分区中某个offset对应的数据时,先根据offset和当前分区的所有segment的名称做比较,确定出数据在哪个segment中,再查找该segment的索引文件,确定当前offset在数据文件中的开始位置,最后从该位置开始读取数据文件,在根据数据格式判断结果,获取完整数据。
通过以上的讲解,已经可以保证kafka集群内部的可靠性,但是在生产者向kafka集群发送时,数据经过网络传输,也是不可靠的,可能因为网络延迟、闪断等原因造成数据的丢失。
kafka为生产者提供了如下的三种可靠性级别,通过不同策略保证不同的可靠性保障。
其实此策略配置的就是leader将成功接收消息信息响应给客户端的时机。
通过request.required.acks参数配置:
1:生产者发送数据给leader,leader收到数据后发送成功信息,生产者收到后认为发送数据成功,如果一直收不到成功消息,则生产者认为发送数据失败会自动重发数据。
当leader宕机时,可能丢失数据。
0:生产者不停向leader发送数据,而不需要leader反馈成功消息。
这种模式效率最高,可靠性最低。可能在发送过程中丢失数据,也可能在leader宕机时丢失数据。
-1:生产者发送数据给leader,leader收到数据后要等到ISR列表中的所有副本都同步数据完成后,才向生产者发送成功消息,如果一只收不到成功消息,则认为发送数据失败会自动重发数据。
这种模式下可靠性很高,但是当ISR列表中只剩下leader时,当leader宕机让然有可能丢数据。
此时可以配置min.insync.replicas指定要求观察ISR中至少要有指定数量的副本,默认该值为1,需要改为大于等于2的值
这样当生产者发送数据给leader但是发现ISR中只有leader自己时,会收到异常表明数据写入失败,此时无法写入数据,保证了数据绝对不丢。
虽然不丢但是可能会产生冗余数据,例如生产者发送数据给leader,leader同步数据给ISR中的follower,同步到一半leader宕机,此时选出新的leader,可能具有部分此次提交的数据,而生产者收到失败消息重发数据,新的leader接受数据则数据重复了。
当leader宕机时会选择ISR中的一个follower成为新的leader,如果ISR中的所有副本都宕机,怎么办?
有如下配置可以解决此问题:
unclean.leader.election.enable=false
策略1:必须等待ISR列表中的副本活过来才选择其成为leader继续工作。
unclean.leader.election.enable=true
策略2:选择任何一个活过来的副本,成为leader继续工作,此follower可能不在ISR中。
策略1,可靠性有保证,但是可用性低,只有最后挂了leader活过来kafka才能恢复。
策略2,可用性高,可靠性没有保证,任何一个副本活过来就可以继续工作,但是有可能存在数据不一致的情况。
At most once:消息可能会丢,但绝不会重复传输。
At least once:消息绝不会丢,但可能会重复传输。
Exactly once:每条消息肯定会被传输一次且仅传输一次。
kafka最多保证At least once,可以保证不丢,但是可能会重复,为了解决重复需要引入唯一标识和去重机制,kafka提供了GUID实现了唯一标识,但是并没有提供自带的去重机制,需要开发人员基于业务规则自己去重。
ISR:与leader保持同步的follower集合
AR:分区的所有副本
LEO:没个副本的最后条消息的offset
HW:一个分区中所有副本最小的offset
每个分区内,每条消息都有一个offset,故只能保证分区内有序。
拦截器 -> 序列化器 -> 分区器
正确
offset+1
先提交offset,后消费,有可能造成数据的重复
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
不可以减少,被删除的分区数据难以处理。
__consumer_offsets,保存消费者offset
一个topic多个分区,一个消费者组多个消费者,故需要将分区分配个消费者(roundrobin、range)
每个分区对应一个文件夹,文件夹的命名为topic-0,topic-1,内部为.log和.index文件
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
partition leader(ISR),controller(先到先得)
不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
分区,顺序写磁盘,0-copy
server.properties中所有配置参数说明(解释)如下列表:
参数 | 说明(解释) |
broker.id =0 | 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
log.dirs=/data/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割/data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 | broker server服务端口 |
message.max.bytes =6525000 | 表示消息体的最大大小,单位是字节 |
num.network.threads =4 | broker处理消息的最大线程数,一般情况下不需要去修改 |
num.io.threads =8 | broker处理磁盘IO的线程数,数值应该大于你的硬盘数 |
background.threads =4 | 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 |
queued.max.requests =500 | 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。 |
host.name | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 |
socket.send.buffer.bytes=100*1024 | socket的发送缓冲区,socket的调优参数SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 | socket的接受缓冲区,socket的调优参数SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 | socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
log.segment.bytes =1024*1024*1024 | topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
log.roll.hours =24*7 | 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖 |
log.cleanup.policy = delete | 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 |
log.retention.minutes=3days | 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据 log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
log.retention.bytes=-1 | topic每个分区的最大文件大小,一个topic的大小限制 =分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖 |
log.retention.check.interval.ms=5minutes | 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 |
log.cleaner.enable=false | 是否开启日志压缩 |
log.cleaner.threads = 2 | 日志压缩运行的线程数 |
log.cleaner.io.max.bytes.per.second=None | 日志压缩时候处理的最大大小 |
log.cleaner.dedupe.buffer.size=500*1024*1024 | 日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好 |
log.cleaner.io.buffer.size=512*1024 | 日志清理时候用到的IO块大小一般不需要修改 |
log.cleaner.io.buffer.load.factor =0.9 | 日志清理中hash表的扩大因子一般不需要修改 |
log.cleaner.backoff.ms =15000 | 检查是否处罚日志清理的间隔 |
log.cleaner.min.cleanable.ratio=0.5 | 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖 |
log.cleaner.delete.retention.ms =1day | 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖 |
log.index.size.max.bytes =10*1024*1024 | 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 |
log.index.interval.bytes =4096 | 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 |
log.flush.interval.messages=None | log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. |
log.flush.scheduler.interval.ms =3000 | 检查是否需要固化到硬盘的时间间隔 |
log.flush.interval.ms = None | 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. |
log.delete.delay.ms =60000 | 文件在索引中清除后保留的时间一般不需要去修改 |
log.flush.offset.checkpoint.interval.ms =60000 | 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改 |
auto.create.topics.enable =true | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
default.replication.factor =1 | 是否允许自动创建topic,若是false,就需要通过命令创建topic |
num.partitions =1 | 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖 |
|
|
以下是kafka中Leader,replicas配置参数 |
|
controller.socket.timeout.ms =30000 | partition leader与replicas之间通讯时,socket的超时时间 |
controller.message.queue.size=10 | partition leader与replicas数据同步时,消息的队列尺寸 |
replica.lag.time.max.ms =10000 | replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中 |
replica.lag.max.messages =4000 | 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效 ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ##到其他follower中. ##在broker数量较少,或者网络不足的环境中,建议提高此值. |
replica.socket.timeout.ms=30*1000 | follower与leader之间的socket超时时间 |
replica.socket.receive.buffer.bytes=64*1024 | leader复制时候的socket缓存大小 |
replica.fetch.max.bytes =1024*1024 | replicas每次获取数据的最大大小 |
replica.fetch.wait.max.ms =500 | replicas同leader之间通信的最大等待时间,失败了会重试 |
replica.fetch.min.bytes =1 | fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 |
num.replica.fetchers=1 | leader进行复制的线程数,增大这个数值会增加follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 | 每个replica检查是否将最高水位进行固化的频率 |
controlled.shutdown.enable =false | 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker |
controlled.shutdown.max.retries =3 | 控制器关闭的尝试次数 |
controlled.shutdown.retry.backoff.ms =5000 | 每次关闭尝试的时间间隔 |
leader.imbalance.per.broker.percentage =10 | leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡 |
leader.imbalance.check.interval.seconds =300 | 检查leader是否不平衡的时间间隔 |
offset.metadata.max.bytes | 客户端保留offset信息的最大空间大小 |
kafka中zookeeper参数配置 |
|
zookeeper.connect = localhost:2181 | zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 | ZooKeeper的最大超时时间,就是心跳的间隔, 若是没有反映,那么认为已经死了,不易过大 |
zookeeper.connection.timeout.ms =6000 | ZooKeeper的连接超时时间 |
zookeeper.sync.time.ms =2000 | ZooKeeper集群中leader和follo |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。