赞
踩
一、生产者原理
1.1、消息发送整体流程
主要有两个线程,分别是main线程和sender线程
KafkaProducer —>ProducerInterceptor—>Serializer—>Partitioner 这个流程都是main线程,然后是 sender线程发送到broker上
批量发送:是提高消息吞吐量重要的方式, Producer 端可以在内存中合并多条消息后, 以一次请求的方式发送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
压缩:Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。 Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传
输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。
1.2、拦截器
可以在生产者的属性中指定多个拦截器,形成拦截器链,实现消息的定制化
1.3、序列化器
第二步就是对key和value进行序列化。
除了上面的自带的序列化工具,还可以指定如:Avro、JSON、Protobuf,或者自定义类型序列化器。
1.4、路由指定(分区器)
1.5、数据可靠性保证ACK
生产者发送消息不是发送出去就完事了,服务端要有一种响应方式,只有在服务端确认之后,生产者才可以发送下一轮,否则重新发送消息。
正常和leader保持同步的副本放到一个动态的set里面这个就是ISR:in-sync replica set ,现在只要ISR:in-sync replica set 里面的follower 同步完之后给客户端发送ACK。
什么时候移出ISR?
多久没有向leader同步数据就会被移除ISR,由参数replica.lag.time.max.ms
决定,默认值30s。
服务端ACK
pros.put("acks","0"); // 不等待ACK
pros.put("acks","1"); // (默认)leader落盘返回ACK
pros.put("acks","-1"); // 或者all。 leader和全部follower落盘返回ACK
三种机制,性能依次递减(吞吐量减少),数据健壮性依次递增,可以根据业务场景使用不同的参数。
二:Broker存储原理
为了实现横向扩展,同时降低单台的负载,把topic分成多个partition,一个partition消息是顺序写入,但是全局不一定有序。
在服务器上每个partition都有一个物理目录,topic名字后面的数字标号代表分区。
为了提高分区的高可用,又增加了副本机制,创建topic的时候通过指定replication-factor
确定副本数,副本数要小于等于broker的节点数,否则就会报错。副本的leader提供读写服务,follower唯一的任务就是从leader拉取数据(冗余)
2.1、为什么不像Mysql主从复制,实现读写分离?
这个是设计思想的不同,读写都发生在leader节点,就不存在读写分离带来的读写一致性问题,这个是单调读一致性。
2.2、副本分配规则
2.3、segment
一个分区是不是只有一个文件,或者消息文件是不是会无限变大?
为了防止log 不断追加导致文件过大,导致检索效率下降,一个partition被分成多个segment来组织数据,在磁盘上segment由一个log文件和2个index文件组成。
2.3.1、segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
kafka必须由快速检索消息的机制,这个就是索引,在kafka中设计了两种索引
2.3.2、index 采用稀疏索引
稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。mmap的 Java 实现对应 MappedByteBuffer 。
Kafka 充分利用二分法来查找对应 offset 的消息位置:
2.4、存储总结
2.5、消息清理策略
kafka提供了两种方式
开关:log.cleaner.enable = true
策略:log.cleanup.policy = delete / compact
删除策略
日志删除是通过定时任务完成的,默认5分钟执行一次,看看有没有删除的数据
周期:log.retention.check.interval.ms=300000
删除老的日志,由一个参数控制过期定义,默认一周,也就是时间戳超过一周的数据才会删除
过期定义:
log.retention.hours
log.retention.minutes
log.retention.ms
默认优先级:ms>minutes>hours
还有一种针对生产消息不均匀的情况,根据日志大小删除,先删除旧的日志,删除到不超过这个 log.retention.bytes
大小为止,默认-1 不限制,也可以针对单个segment文件大小限制 log.segment.bytes
默认1G
压缩策略
这种是不删除,而是对日志进行压缩,压缩就是把相同的key合并为最后一个value,这个普通的压缩不一样,理解为压紧更加合适 。
2.6、高可用架构
2.6.1、controller选举
当创建一个分区或者分区增加副本的时候,都要从所有的副本中选举一个新的leader出来。当leader挂了之后,并不是所有的副本都参与leader 的选举,而是由其中一个broker统一来指挥,这个broker的角色就是controller(控制器),这样就可以避免了分区和副本过多,如果所有的都参与选举的话造成大量的watch事件被触发,导致zk负载过重。
controller:所有的broker会尝试在zk创建临时节点/controller,只有一个可以成功(先到先得),如果controller挂掉了,zk上的节点消失,其他的broker通过watch监听到controller下线了,开始竞争新的controller,方法还是一样。
controller节点:
2.6.2、分区副本选举
正常和leader保持同步的副本放到一个动态的set里面这个就是ISR:in-sync replica set
默认情况下,当leader副本挂掉之后,只有在ISR集合的副本才有资格被选举为新的leader。如果ISR为空,可以让ISR之外的副本参与选举。这个叫unclean leader election。
kafka没有使用zab、raft等共识算法,而是自己实现一种 :默认是ISR中第一个副本变成leader,leader确认之后,客户端读写只能操作leader,follower需要向leader同步数据
2.7、主从如何同步
这里有两个概念:
2.7.1、主从保持同步过程
2.7.2、follower挂掉
follower 恢复之后:
2.7.3、leader挂掉
Partition Replicates副本选举机制:
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活),controller会从每个parititon的replicas副本列表中取出第一个broker作为leader,当然这个broker需要也同时在ISR列表里。
注意这种只保证副本之间数据的一致性,不能保证数据不丢失或者不重复
三:消费者原理
Pull模式consumer 可以自主决定是否批量的从 broker 拉取数据。Push 模式必须在不知道consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。
kafka采用的是pull模式:
缺点:kafka没有数据会导致consumer空轮询,解决:通过参数设置consumer拉取数据为空进行阻塞
Consumer Group :同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模式。 partition 内消息是有序的, Consumer 通过 pull 方式消费消息。 Kafka 不
删除已消费的消息对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。
对于一个partition,消费者组怎么才能做到接着上次消费的位置(offset)继续消费?肯定要把对应的关系保存起来。
offset的存储(offset在partition中是连续的,而不是全局连续编号):
3.1、对应关系保存在哪里
首先肯定不是保存在消费者本地,因为所有的消费者都可以使用这个consumer_group_id,放在本地做不到统一维护,所以放在服务端。早期版本维护在zk中,但是读写性能消耗太大了, 后来放在一个特殊的topic中,名字是_consumer_offsets,默认有50分区,每个分区默认一个副本。
3.2、_consumer_offsets 存储结构
topic里面是可以存放对象类型的value(序列化),主要存储两种对象:
GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者有编号)。
OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据
3.3、如果找不到offset
latest (默认值),也就是从最新的消息开始消费,历史消息是不能消费
earliest,从最早的(最先发送)的消息开始消费,历史消息是可以消费
none,如果consumer_group 在服务端找不到offset 会报错
3.4、offset什么时候更新
消费者必须要有一个commit动作,offset才会更新,消费者可以自动提交或者手动提交,由消费端这个参数控制
enable.auto.commit=true/false
,默认true。
还可以使用一个参数控制自动提交的频率: auto.commit.interval.ms=5000
默认5s
如果需要业务逻辑处理之后才commit,应该改为false,这样消费者必须要调用一个方法让broker更新offset
如果不提交或者提交失败,broker的offset 不会更新,消费者组下次消费的时候会消费到重复的消息。
3.5、消费策略
多个consumer group和一个partition的关系:重复消费
一个cg中的消费者数量和分区相等? 消费者比分区少怎么分配? 消费者比分区多怎么分配?
还有assign接口 可以手动指定消费分区 ,相当于consumer_group_id失效,在第一次消费者启动的时候,消费关系就已经确定,如果分配策略没变关系是不会变动的。
3.6、Rebalance 分区重新分配
Rebalance 触发:
分区Rebalance 重新分配流程:
第一阶段:选择组协调器
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。
组协调器选择方式:通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer group的coordinator
公式:hash(consumer group id) % __consumer_offsets主题的分区数
第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案(由于rebalance等策略有客户端配置决定,因此分区方案需要consumer来制定,以消费组协调器的配置为准)。
第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。
消费者Rebalance分区分配策略
主要有三种rebalance的策略:range、round-robin、sticky。 Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。
如果C1消费消息超时,触发Rebalance ,重新分配后,该消息会被其他的消费者C2消费,此时C1消费完成提交offset会导致错误?
解决:GroupCoordinator 每次Rebalance ,会标记一个 generation 给到consumer,没有Rebalance 这个generation 就会+1,consumer提交offset时,GroupCoordinator 会对比generation 值,不一致拒绝提交,这个就是版本号或者epoch的思想
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。