当前位置:   article > 正文

Kafka 原理_kafka 副本因子

kafka 副本因子

Kafka 原理

一、生产者原理

1.1、消息发送整体流程

主要有两个线程,分别是main线程和sender线程
KafkaProducer —>ProducerInterceptor—>Serializer—>Partitioner 这个流程都是main线程,然后是 sender线程发送到broker上
批量发送:是提高消息吞吐量重要的方式, Producer 端可以在内存中合并多条消息后, 以一次请求的方式发送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
压缩:Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。 Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传
输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。

1.2、拦截器

可以在生产者的属性中指定多个拦截器,形成拦截器链,实现消息的定制化

1.3、序列化器

第二步就是对key和value进行序列化。
在这里插入图片描述
除了上面的自带的序列化工具,还可以指定如:Avro、JSON、Protobuf,或者自定义类型序列化器。

1.4、路由指定(分区器)

  • 1、指定了partition——用它;
  • 2、没有指定partition,自定义了分区器——按自定义的规则来;
  • 3、没有指定partition,没有自定义分区器,但是key不为空——hash以后取余;
  • 4、没有指定partition,没有自定义分区器,但是key是空的——整数自增取模。

1.5、数据可靠性保证ACK

生产者发送消息不是发送出去就完事了,服务端要有一种响应方式,只有在服务端确认之后,生产者才可以发送下一轮,否则重新发送消息。

正常和leader保持同步的副本放到一个动态的set里面这个就是ISR:in-sync replica set ,现在只要ISR:in-sync replica set 里面的follower 同步完之后给客户端发送ACK。

什么时候移出ISR?

多久没有向leader同步数据就会被移除ISR,由参数replica.lag.time.max.ms 决定,默认值30s

服务端ACK

pros.put("acks","0"); // 不等待ACK
pros.put("acks","1"); // (默认)leader落盘返回ACK
pros.put("acks","-1"); // 或者all。 leader和全部follower落盘返回ACK
  • 1
  • 2
  • 3
  • 0:不等broker确认,提供了最低的延迟,当broker故障时有可能丢失数据
  • 1:默认,partition的leader落盘成功后返回ACK,如果follower同步成功之前leader故障,有可能丢失数据
  • -1:all,等待leader和follower全部落盘成功之后才返回ACK,broker发送ACK之前leader故障,没有给生产者发送ACK,那么会造成数据重复。

三种机制,性能依次递减(吞吐量减少),数据健壮性依次递增,可以根据业务场景使用不同的参数。

二:Broker存储原理

为了实现横向扩展,同时降低单台的负载,把topic分成多个partition,一个partition消息是顺序写入,但是全局不一定有序。

在服务器上每个partition都有一个物理目录,topic名字后面的数字标号代表分区。

为了提高分区的高可用,又增加了副本机制,创建topic的时候通过指定replication-factor 确定副本数,副本数要小于等于broker的节点数,否则就会报错。副本的leader提供读写服务,follower唯一的任务就是从leader拉取数据(冗余)

2.1、为什么不像Mysql主从复制,实现读写分离?

这个是设计思想的不同,读写都发生在leader节点,就不存在读写分离带来的读写一致性问题,这个是单调读一致性。

2.2、副本分配规则

  • 1)副本因子不能大于Broker的个数;
  • 2)第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
  • 3)其他分区的第一个副本放置位置相对于第0个分区依次往后(nextReplicaShift)

2.3、segment

一个分区是不是只有一个文件,或者消息文件是不是会无限变大?

为了防止log 不断追加导致文件过大,导致检索效率下降,一个partition被分成多个segment来组织数据,在磁盘上segment由一个log文件和2个index文件组成。
在这里插入图片描述

  • log.segment.bytes //segment默认大小1G,由这个参数控制
  • log.roll.hours
    //消息的最大时间戳和当前时间戳的差值,默认一周,如果旧的segment是一周之前写的,现在创建新的segment
  • log.roll.ms //更加精细的时间单位,毫秒级别的日志切割
  • log.index.size.max.bytes //offset索引文件大小,默认10M,索引写满了,数据文件也要跟着拆分,不然对不上

2.3.1、segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

kafka必须由快速检索消息的机制,这个就是索引,在kafka中设计了两种索引

  • .index 偏移量(offset)索引文件
  • .timeindex 时间戳(timestamp)索引文件

2.3.2、index 采用稀疏索引

稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。mmap的 Java 实现对应 MappedByteBuffer 。

Kafka 充分利用二分法来查找对应 offset 的消息位置:

  • 按照二分法找到小于 offset 的 segment 的.log 和.index
  • 用目标 offset 减去文件名中的 offset 得到消息在这个 segment 中的偏移量。
  • 再次用二分法在 index 文件中找到对应的索引。
  • 到 log 文件中,顺序查找,直到找到 offset 对应的消息。

2.4、存储总结
在这里插入图片描述

2.5、消息清理策略

kafka提供了两种方式

  • 直接删除delete:默认
  • 日志进行压缩
开关:log.cleaner.enable = true
策略:log.cleanup.policy = delete / compact
  • 1
  • 2

删除策略

日志删除是通过定时任务完成的,默认5分钟执行一次,看看有没有删除的数据

周期:log.retention.check.interval.ms=300000
  • 1

删除老的日志,由一个参数控制过期定义,默认一周,也就是时间戳超过一周的数据才会删除

过期定义:
 log.retention.hours 
 log.retention.minutes 
 log.retention.ms
  • 1
  • 2
  • 3
  • 4

默认优先级:ms>minutes>hours

还有一种针对生产消息不均匀的情况,根据日志大小删除,先删除旧的日志,删除到不超过这个 log.retention.bytes 大小为止,默认-1 不限制,也可以针对单个segment文件大小限制 log.segment.bytes 默认1G

压缩策略

这种是不删除,而是对日志进行压缩,压缩就是把相同的key合并为最后一个value,这个普通的压缩不一样,理解为压紧更加合适 。

2.6、高可用架构

2.6.1、controller选举

当创建一个分区或者分区增加副本的时候,都要从所有的副本中选举一个新的leader出来。当leader挂了之后,并不是所有的副本都参与leader 的选举,而是由其中一个broker统一来指挥,这个broker的角色就是controller(控制器),这样就可以避免了分区和副本过多,如果所有的都参与选举的话造成大量的watch事件被触发,导致zk负载过重。

controller:所有的broker会尝试在zk创建临时节点/controller,只有一个可以成功(先到先得),如果controller挂掉了,zk上的节点消失,其他的broker通过watch监听到controller下线了,开始竞争新的controller,方法还是一样。

controller节点:

  • 监听 (broker变化,topic变化、partition变化)
  • 获取和管理broker,topic、partition的信息
  • 管理partition leader、follower主从信息

2.6.2、分区副本选举

正常和leader保持同步的副本放到一个动态的set里面这个就是ISR:in-sync replica set

默认情况下,当leader副本挂掉之后,只有在ISR集合的副本才有资格被选举为新的leader。如果ISR为空,可以让ISR之外的副本参与选举。这个叫unclean leader election。

kafka没有使用zab、raft等共识算法,而是自己实现一种 :默认是ISR中第一个副本变成leader,leader确认之后,客户端读写只能操作leader,follower需要向leader同步数据

2.7、主从如何同步

这里有两个概念:

  • LEO(Log End Offset):下一条等待写入的消息的offset(最新的offset + 1)
  • HW(Hign Watermark):ISR中最小的LEO。

2.7.1、主从保持同步过程

  • 1、Follower节点会向Leader发送一个fetch请求,leader向follower发送数据后,需要更新follower的LEO。
  • 2、follower接收到数据响应后,依次写入消息并且更新LEO。
  • 3、Leader更新HW(ISR最小的LEO)

2.7.2、follower挂掉

follower 恢复之后:

  • 1、找到之前的记录的HW,把高于HW的消息截掉
  • 2、然后向leader同步消息,追上leader之后(30),重新加上ISR

2.7.3、leader挂掉

  • 1、首先选一个leader(ISR中第一个)
  • 2、 其他follower把高于HW的消息截掉,然后同步

Partition Replicates副本选举机制:
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活),controller会从每个parititon的replicas副本列表中取出第一个broker作为leader,当然这个broker需要也同时在ISR列表里。

注意这种只保证副本之间数据的一致性,不能保证数据不丢失或者不重复

三:消费者原理

Pull模式consumer 可以自主决定是否批量的从 broker 拉取数据。Push 模式必须在不知道consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。

kafka采用的是pull模式:

  • 根据consumer的消费能力进行数据拉取,可以控制速率
  • 可以批量拉取,也可以单条
  • 可以设置不同的提交方式

缺点:kafka没有数据会导致consumer空轮询,解决:通过参数设置consumer拉取数据为空进行阻塞

Consumer Group :同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模式。 partition 内消息是有序的, Consumer 通过 pull 方式消费消息。 Kafka 不
删除已消费的消息对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。

对于一个partition,消费者组怎么才能做到接着上次消费的位置(offset)继续消费?肯定要把对应的关系保存起来。

offset的存储(offset在partition中是连续的,而不是全局连续编号):
在这里插入图片描述

3.1、对应关系保存在哪里

首先肯定不是保存在消费者本地,因为所有的消费者都可以使用这个consumer_group_id,放在本地做不到统一维护,所以放在服务端。早期版本维护在zk中,但是读写性能消耗太大了, 后来放在一个特殊的topic中,名字是_consumer_offsets,默认有50分区,每个分区默认一个副本。

3.2、_consumer_offsets 存储结构

topic里面是可以存放对象类型的value(序列化),主要存储两种对象:

  • GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者有编号)。

  • OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据
    在这里插入图片描述
    3.3、如果找不到offset

  • latest (默认值),也就是从最新的消息开始消费,历史消息是不能消费

  • earliest,从最早的(最先发送)的消息开始消费,历史消息是可以消费

  • none,如果consumer_group 在服务端找不到offset 会报错

3.4、offset什么时候更新

消费者必须要有一个commit动作,offset才会更新,消费者可以自动提交或者手动提交,由消费端这个参数控制
enable.auto.commit=true/false,默认true。
还可以使用一个参数控制自动提交的频率: auto.commit.interval.ms=5000默认5s

如果需要业务逻辑处理之后才commit,应该改为false,这样消费者必须要调用一个方法让broker更新offset

  • consumer.commitSync():手动同步提交
  • consumer.commitASync():手动异步提交

如果不提交或者提交失败,broker的offset 不会更新,消费者组下次消费的时候会消费到重复的消息。

3.5、消费策略

多个consumer group和一个partition的关系:重复消费

一个cg中的消费者数量和分区相等? 消费者比分区少怎么分配? 消费者比分区多怎么分配?

  • 范围分配
  • 轮询分配
  • 粘带分配
    • 分区的分配尽可能均匀
    • 分区的分配尽可能和上次分配保存相同

还有assign接口 可以手动指定消费分区 ,相当于consumer_group_id失效,在第一次消费者启动的时候,消费关系就已经确定,如果分配策略没变关系是不会变动的。

3.6、Rebalance 分区重新分配

Rebalance 触发:

  • consumer group 中成员个数发生变化
  • consumer消费超时
  • group 订阅的topic个数发生变化
  • group 订阅的topic的分区数发生变化

分区Rebalance 重新分配流程:

  • 1、找到话事人,每个broker上都有一个管理消费offset、消费者组的实例(GroupCoordinator),第一步就是要从所有的GroupCoordinator中找到一个话事人出来,通常是partition的leader节点所在的broker节点。
  • 2、GroupCoordinator 通过心跳通知所有的consumer进行Rebalance
  • 3、清点人数,让所有的消费者连到GroupCoordinator报数,这个就是join group请求
  • 4、选组长,GroupCoordinator从所有的消费者中选一个leader,这个消费者会根据消费者的情况和设置的策略,确定一个分配方案
  • 5、leader把方案上报给GroupCoordinator,GroupCoordinator在把方案通知所有的消费者
  • 6、完成Rebalance

第一阶段:选择组协调器
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer group的coordinator
公式:hash(consumer group id) % __consumer_offsets主题的分区数

第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案(由于rebalance等策略有客户端配置决定,因此分区方案需要consumer来制定,以消费组协调器的配置为准)。

第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

消费者Rebalance分区分配策略
主要有三种rebalance的策略:range、round-robin、sticky。 Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费者与订阅主题之间的分区分配策略。

如果C1消费消息超时,触发Rebalance ,重新分配后,该消息会被其他的消费者C2消费,此时C1消费完成提交offset会导致错误?

解决:GroupCoordinator 每次Rebalance ,会标记一个 generation 给到consumer,没有Rebalance 这个generation 就会+1,consumer提交offset时,GroupCoordinator 会对比generation 值,不一致拒绝提交,这个就是版本号或者epoch的思想

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/742131
推荐阅读
相关标签
  

闽ICP备14008679号