赞
踩
Apache Kafka是一个开放源代码的分布式事件流平台,成千上万的公司使用它来实现高性
能数据管道,流分析,数据集成和关键任务等相关的应用程序。
Producer:生产者可以将数据发布到所选择的topic(主题)中。生成者负责将记录分配到topic的哪一个分区(partition)中,这里可以使用对多个partition循环发送来实现多个server负载均衡
Broker:日志的分区(partition)分布在Kafka集群的服务器上。每个服务器处理数据和请求时,共享这些分区。每一个分区都会在以配置的服务器上进行备份,确保容错性。
其中,每个分区都有一台server作为leader,零台或堕胎server作为follows。leader server处理一切对分区的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers中的一台server会自动成为新的eader,每台server都会成为某些分区的leader和某些分区的follower,因此集群的负载是均衡的
Consumer:消费者使用一个group(消费组)名称来表示,发布到topic中的每条记录将被分配到订阅消费组中的其中一个消费者示例。消费者实例可以分布在多个进程中或多个机器上
这里有两个注意的地方:
基本流程:
ProducerBatch
批量的进行消息发送到Sender线程处理(这里为了提高发送效率,减少带宽),ProducerBatch中就是我们需要发送的消息,其中消息累加器中可以使用Buffer.memory
配置,默认为32MBSelector
,Selector发送消息到Kafka集群
基本流程:
Consumer Group中的Consumer向各自注册的分区上进行消费消息
Consumer消费消息后会将当前标注的消费位移信息以消息的方式提交到位移主题中记录,一个Consumer Group中多个Consumer会做负载均衡,如果一个Consumer宕机,会自动切换到组内别的Consumer进行消费
关键的点:
Consumer Group:组内多个的Consumer可以公用一个Consumer Id,组内所有的Consumer只能注册到一个分区上去消费,一个Consumer Group只能到一个Topic上去消费
位移主题:
位移主题的主要作用是保存Kafka消费者的位移信息
Kafka老版本之前:
在Kafka老版本之前处理方式是自动或手动地将位移数据提交到Zookeeper进行保存,Consumer重启后,自动从Zookeeper中读取消费位移信息,从而在上次的offset地方继续消费
优点: Kafka Broker中不需要保存位移数据,减少了Broker端需要持有的状态信息,有利于动态扩展
缺点: 每一个Consumer消费后需要发送位移信息到Zookeeper,而Zooker不适用于这种高频的写操作
Kafka最新版本中位移主题的处理方式:
Consumer的位移信息offset会当作一条条普通消息提交到位移主题(_consumer_offsets)中。
window文件系统中的文件列表:
这里比较好理解:
一些场景需要保证多个消息的消费顺序,比如订单,但在kafka中一个消息可能被发到多个partition中多个线程处理,被多个消费者消费,无法保证消息的消费顺序
解决方案:将需要顺序消费的消息发送的时候设置将某个topic发送到指定的partition(也可以根据key的hash与分区进行运算),则在partition中的消息也是有序的,消费的时候将一组同hash的key放到同一个queue中保证同一个消费者下的同一个线程对此queue进行消费。
总结:一个producer->一个partition->一个queue->一个comsumer->一个线程
当对于需要顺序消费的消息数量大的时候,无法保证吞吐量
AR(Assigned Replicas):分区中的所有副本统称为AR。所有消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。但是在同步期间,follower对于leader而言会有一定程度的滞后,这个时候follower和leader并非完全同步状态
OSR(Out Sync Replicas):follower副本与leader副本没有完全同步或滞后的副本集合
ISR(In Sync Replicas):AR中的一个子集,ISR中的副本都是与leader保持完全同步的副本,如果某个在ISR中的follower副本落后于leader副本太多,则会被从ISR中移除,否则如果完全同步,会从OSR中移至ISR集合。
在默认情况下,当leader副本发生故障时,只有在ISR集合中的follower副本才有资格被选举为新leader,而OSR中的副本没有机会(可以通过unclean.leader.election.enable
进行配置)
HW(High Watermark):高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位 offset 之前的消息
LEO(Log End Offset):标识当前日志文件中下一条待写入的消息的offset。在ISR集合中的每个副本都会维护自身的LEO,且HW==LEO。
图中,HW就是8,Consumer只能拉去0~7的消息,LEO就是15,代表消息还没有同步到follower
下面通过一个例子来说明下ISR、HW、LEO之间的关系:
假设由一个leader副本,它有两个follower副本,这时候producer向leader写入3、4两条消息,我们来观察下他们是如何同步的
这个时候写入两条消息到leader,这个时候LEO变为5,然后follower开始同步leader数据
由于网络或其它原因,follower2同步效率较低,还没有完成同步,这个时候HW的offset为4,在此offset之前的消息Consumer都可见
在一定的延迟后,follower2也完成了队leader副本的同步,这时HW为5,LEO为5,且两个follower副本都在ISR集合中,在leader或follower宕机后,会在ISR集合的副本中选举一个来当新的leader副本
HW高水位的弊端:
消息1
消息丢失的过程(min.insync.replicas=1):在kafka 0.11.0.0版本中引入Leader Epoch来解决使用高水位导致的数据丢失和数据不一致的问题
所谓leader epoch实际上是一对值:(epoch,offset),epoch标识leader的版本号,从0开始,每变更一次leader,epoch+1;而offset对应于该epoch版本的leader写入第一条消息(成为leader后的首条消息)的位移
(0,0)、(1,120)表示第一个leader从位移0开始写入消息,共写了120条,第二个leader版本号为1,从位移120处开始写入消息
规避数据丢失(图片来源网络):
规避数据不一致(图片来源网络):
顺序写入:顺序写入与随机写入速度相差高达6000倍
批量处理:使用消息累加器仅多个消息批量发送,既节省带宽有提高了发送速度
消息压缩:kafka支持队消息压缩,支持格式有:gzip、snapply、lz4,可以使用compression.type
配置
页缓存:在消息发送后,并没有等到消息写入磁盘后才返回,而是到page Cache中就返回。page Cache与文件系统的写入由操作系统自动完成
零拷贝(zero-copy):Kafka两个重要过程都使用了零拷贝技术,且都是操作系统层面的狭义零拷贝,一是Producer生产的数据存到broker,二是 Consumer从broker读取数据。
正常的非零拷贝的数据拷贝过程:
硬盘—>内核缓冲区—>用户缓冲区—>内核socket缓冲区—>协议引擎
Producer生产的数据持久化到broker,采用mmap文件映射
,实现顺序的快速写入;
硬盘—>内核缓冲区—>共享到用户空间缓存,共享而不是复制
Customer从broker读取数据,采用sendfile
,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。 sendfile() 只是适用于应用程序地址空间不需要对所访问数据进行处理的情况
硬盘—>内核缓冲区—>内核socket缓冲区—>协议引擎
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
log.dir | 保存日志数据的目录(对log.dirs属性的补充) | string | /tmp/kafka-logs | |
log.dirs | 保存日志数据的目录,如果未设置将使用log.dir的配置 | string | null | log.dirs=/home/kafka1,/home/kafka2,/home/kafka3 |
zookeeper.connect | Zookeeper主机地址 | string | zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listeners | 监听器列表 - 使用逗号分隔URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须设置listener.security.protocol.map。指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。 | string | null | 合法监听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 |
listener.security.protocol.map | 侦听器名称和安全协议之间的映射。 | string | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL | |
auto.create.topics.enable | 是否允许在服务器上自动创建topic | boolean | true | 推荐为false |
unclean.leader.election.enable | 指定副本是否能够不再ISR中被选举为leader,即使这样可能会丢失数据 | boolean | false | 推荐为true |
auto.leader.rebalance.enable | 是否允许leader平衡。后台线程会定期检查并触发leader平衡 | boolean | true | 推荐为true |
log.retention.{hours | minutes | ms} | 日志删除的时间阈值(时、分、毫秒) | int |
log.rentention.bytes | 日志删除的大小阈值 | long | -1 | -1,表示没有限制 |
message.max.bytes | kafka允许的最大的一个批次消息大小 | int | 1000012=976KB |
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
retention.ms | 规定了该topic消息被保存的时长 | long | 604800000 | |
retention.bytes | 规定了要为该 Topic 预留多大的磁盘空间 | long | -1 | |
max.message.bytes Kafka | Kafka允许接收该topic最大消息大小 | int | 1000012=976KB |
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
auto.commit.interval.ms | 消费者偏移量自动提交给Kafka的频率(以毫秒为单位) | int | 5000 |
文章中分别介绍了Kafka的整体架构、架构设计的细节、Kafka实现高性能所作出的努力及一些常用的配置。同时通过比较多的图解来详细说明一些复杂逻辑。
传统文件访问:
mmap的作用是映射文件描述符和指定文件的(off_t off)区域至调用进程的(addr,addr *len)的内存区域,如下图所示:
直接将文件映射到内存中,且不需要经过cache、分页物理存储
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。