赞
踩
Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
三大特点:
高吞吐量
可以满足每秒百万级别消息的生产和消费——生产消费。
持久性
有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
分布式
基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
健壮性。
高吞吐率 :在廉价的商用机器上单机可支持每秒100万条消息的读写
消息持久化 :所有消息均被持久化到磁盘,无消息丢失,支持消息重放
完全分布式 :Producer,Broker,Consumer均支持水平扩展
同时适应在线流处理和离线批处理
Kafka的服务:
Kafka服务相关:
注意:
Kafka中的broker对于调用者而言都是透明的,也就是说各个broker的地位都是一样的,但是在kafka内部有区分,主要就是controller和非controller之分,controller的角色我们可以在zookeeper的对应目录/kafka/controller中获取对应的brokerid。
在kafka1.0以下的版本中使用zk来保存kafka消费者的offset(目录为/kafka/consumers/**),但是在kafka1.0以上,不再使用zookeeper来保存,主要原因在于,避免zookeeper负载过高,造成相关联的框架无法使用,此时在kafka提供了一个系统级别的topic:__consumer_offsets来报错偏移量信息。
Topic
逻辑概念,同一个Topic的消息可分布在一个或多个节点(Broker)上
一个Topic包含一个或者多个Partition
每条信息都属于且仅属于一个Topic
Producer发布数据是,必须制定该消息发布到哪一个Topic
Consumer订阅消息时,也必须制定订阅哪个Topic的消息
Partition
物理概念,一个Partition只分布在一个Broker上(不考虑备份)
一个partition物理上对应一个文件夹
一个Partition包含多个Segment(Segment对用户透明)
一个Segment对应一个文件,Segment由一个个不可变记录组成
记录只会被append到Segment中,不会被单独删除或者修改
清除过期日志时,直接删除一个或多个Segment
segment文件(log文件)文件名规范: 这个文件里面第一条消息的offset - 1
flush策略
############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk ## 每当每一个topic接收到10000条message的时候,就会将数据flush到磁盘 log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #每个1s flush一次数据 log.flush.interval.ms=1000
为了提供kafka的读写数据能力,首先接收数据到kafka内存,不可能无限制的保存在内存,所以必然会将数据flush到磁盘(partition的segement)文件,在flush的时候做了Durability和Latency和Throughput的权衡与取舍。
retention策略
############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age # 日志最小的保留时间:7天,超过这个时间,数据可能会被清理掉 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned(裁剪) from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. ## segement文件如果超过log.retention.bytes的配置,将会可能被裁剪,直到小于log.retention.bytes配置 #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. # 一个segment文件最大的大小,超过log.segment.bytes一个G,将会创建一个新的segment文件 log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies ## 每隔5分钟,检测一次retention策略是否达到 log.retention.check.interval.ms=300000
partition对应的文件,就保存在一个个的segment文件中,每一个文件默认大小是1G,但是log.retention.check.interval.ms监测频率是5分钟一次,所以segment文件可能会超过1G,此时就会启动retion策略,将文件裁剪到log.retention.bytes配置,如果超过了log.segment.bytes=1G配置,将会创建一个新的segment文件;默认情况,segment文件会保留7天。
.index文件和.log文件说明
partition分区目录下的文件列表,主要包含两种类型的文件 x.index索引文件和x.log segment文件,其中x.log保存的是message信息,x.index保存的是索引数据。
这二者文件的大致结果如下:
为什么会出现消息offset和文件中的偏移量不一样的问题?
因为一个partition下面有多个segment文件,segment文件当达到retention策略之后将会被裁剪或删除,同时partition中的offset是单调递增的,从0开始增加,但是segment文件中的消息在该文件中的偏移量指的是文件开头到该文件走过的字节长度,显然这两个不一样。
所以,直接根据msg的offset是无法直接读取到消息的,那怎么办?所以此时就需要俺们的x.index中保存的相对偏移量来帮忙了。
x.index中保存的内容:
也就是说index文件的序号对应的是log文件中的消息偏移量;index文件中的地址栏对应的是log文件中文件中的便宜字节。
kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \ --->打印读取到的segment日志文件内容
--files 00000000000000000000.log --->指定读取的segment日志文件
读取到的数据格式如下:
其中的offset是该条message在该partition中的偏移量,position为该条消息在该文件中的字节偏移量。
消息检索过程
以这个partition目录下面,00000000001560140916为例,现在要定位offset 为1560140921的message
定位到具体的segment日志文件
由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件:00000000001560140916.log
计算查找的offset在日志文件的相对偏移量
segment文件中第一条消息的offset = 1560140917
计算message相对偏移量:需要定位的offset - segment文件中第一条消息的offset + 1 = 1560140921 - 1560140917 + 1 = 5
查找index索引文件, 可以定位到该消息在日志文件中的偏移字节为456. 综上, 直接读取文件夹00000000001560140916.log中偏移456字节的数据即可。
1560140922 -1560140917 + 1 = 6
如果查找的offset在日志文件的相对偏移量在index索引文件不存在, 可根据其在index索引文件最接近的上限偏移量,往下顺序查找
说明:kafka的controller的选举是在所有kafka节点启动的时候发生的,或者当controller挂掉,再从其余的broker中选举出一台作为controller。
所以查看controller的选举入口,最简单就是kafka的启动,通过kafka-server-start.sh脚本,发现该类为kafka.Kafka。
零拷贝
传统模式下数据从文件传输到网络需要4次数据拷贝,4次上下文切换和2次系统调用
通过NIO的transferTo/transferFrom调用操作系统的sendfile实现零拷贝。总共2次内核数据拷贝,2次上下文切换和1次系统调用,消除了CPU数据拷贝
批处理和压缩
Producer和Consumer均支持批量处理数据,从而减少了网络传输的开销
Producer可将数据压缩后发送给broker,从而减少网络传输代价。目前支持Snappy,Gzip和LZ4压缩
Partition
通过Partition实现了并行处理和水平扩展
Partition是Kafka(包括kafka Stream)并行处理的最小单位
不同Partition可处于不同的Broker,充分利用多机资源
同一Broker上的不同Partition可置于不同的Directory,如果节点上有多个Disk Drive,可将不同的Drive对应的Directory,从而是Kafka充分利用Disk Drive的磁盘优势
ISR
ISR实现了可用性和一致性的动态平衡
replica.log.time.max.ms=10000
replica.log.max.messages=4000
ISR可容忍更多的节点失败
Majority Quorum如果要容忍f个节点失败,至少需要2f+1个节点
ISR如果要容忍f个节点失败,至少需要f+1个节点
如何处理Replica Crach
Leader crash后,ISR中的任何replica皆可竞选称为Leader
如果所有replica都crash,可选择让第一个recover的replica或者第一个在ISR中的replica称为leader
unclean.leader.election.enable=true
producer和吞吐量成正比
consumer数据量在没有达到partition个数之前,和消费的吞吐量成正比。
分区格式和生成的吞吐量,在一定范围内,先增长,当达到某一个值之后区域稳定,在上下浮动
随着message size的增大,生产者对应的每秒生成的记录数在成下降趋势,区里的数据体积成上升趋势。
副本越大,自然需要同步数据的量就越多,自然kafka的生成的吞吐量就越低。
1.kafka-producer-perf-test.sh
bin/kafka-producer-perf-test.sh --topic spark \
--num-records 100000 \ -->测试生成多少条记录
--throughput 10000 \ --->生产这的吞吐量,约等于messages/sec
--record-size 10 \ -->每条消息的大小
--producer.config config/producer.properties
2.kafka-consumer-perf-test.sh
bin/kafka-consumer-perf-test.sh --topic spark \
--broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 \
--messages 100000 ---->总共要读取多少条记录
读取到的结果
start.time=2019-08-06 02:31:23:738 --->开始时间
end.time=2019-08-06 02:31:24:735 --->结束时间
data.consumed.in.MB=0.9534 --->总共消费的数据体积
MB.sec=0.9562 --->每秒钟消费数据体积
data.consumed.in.nMsg=100000 --->总共消费的数据记录数
nMsg.sec=100300.9027 --->每秒钟消费记录数
rebalance.time.ms=47 --->进行rebalance的时间
fetch.time.ms=950 --->抓取这10w条数据总共花费多长时间
fetch.MB.sec=1.0035 --->每秒钟抓取数据体积
fetch.nMsg.sec=105263.1579 --->每秒钟抓取数据记录数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。