赞
踩
kafka作为一个MQ,我们将kafka分为服务端和客户端来讲解。服务端指kafka服务,即接收并存储消息的服务。客户端指我们在自己项目里调用kafka提供的JAVA API实现消息生产者和消费者的功能。本文我们介绍kafka服务端的工作机制和原理,只有了解和熟悉了kafka服务端的原理,才可以更好的在客户端实现生产者和消费者的功能。
消息:
就是MQ中的消息概念。kafka只接收字节数组的消息。
主题:
在MQ中,主题的概念我们并不陌生。主题是对消息的分类,它类似于数据库中表的概念。
分区:
kafka中引入了分区的概念。每个主题下,可以包括若干个分区。消息进入主题后,会存入不同的分区里。分区是服务器上的文件,消息进入分区,其实就是写入了分区文件。所以,我们理解分区,就是消息文件即可。一个主题有若干分区,也就是消息写入了若干个文件。
由此可知,kafka中是把消息写入文件的,这也是kafka的特点之一:消息持久化
偏移量:
偏移量是kafka分区里内部维护的 一个元数据,在消息创建的时候会把偏移量加入消息中,是一个递增的整数。我们可以理解为,在某个分区中,偏移量就是消息的脚标即可。一个偏移量代表着一个消息。
kafka单机版和集群的安装,我们这里不进行讲解,网上教材很多,可以自行查阅并安装。这里我们讲解kafka安装成功后,在kafka服务中的一些管理命令。
1.创建主题命令:
kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 8
–topic:主题名称
–replication-factor:副本数,在单机中,副本数只能是1。在集群中,副本数要<=集群节点数量。这个在集群时我们会讲到。
–partitions:分区数。我们上面讲到主题下包含分区,这里我们创建了8个分区。
执行结果如下:
2.增加分区命令:
kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
将分区由8个增加到了16个。需要注意的是,分区只能增加,无法删除。一般的,我们创建主题时,会根据数据量大小计算出分区的数量,分区确定后,不再进行分区的增加。因为增加分区会造成再均衡,影响性能,这点儿我们后面进行讲解。
3.查询主题命令:
kafka-topics.bat --zookeeper localhost:2181 --list
这里需要用到zookeeper的ip和端口。kafka的运行依托于zookeeper。kafka自带zookeeper。我们也可以自己运行单独的zookeeper。在安装kafka时,我们应该已经很清楚这些知识了。
4.查询主题详情:
kafka-topics.bat --zookeeper localhost:2181 --describe
我们看输出结果:
该命令列出了每个分区的详情信息。
Partition:代表分区编号。
Leader:代表首领,首领的概念我们下面再进行讲解。
Replicas:代表副本
ISR:kafka维护的副本列表,在第二节中我们讲解
5.创建生产者:
kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic
创建生产者后,会进入控制台,我们可以向kafka服务发送消息。如下图所示:
这样,我们就通过生产者把消息发送给kafka服务了。由此可知,在kafka的服务端,可以手动创建生产者,并发送命令。在我们实际应用中,我们更多的是在客户端通过JAVA API创建生产者并发送我们的业务数据信息。
6.创建消费者:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning
–bootstrap-server:写kafka的ip地址和端口。kafka端口默认是9092。
执行命令,我们可以看到,消费者收到了刚才生产者发送的消息:
同样的,在实际应用中,消费者也是在客户端通过JAVA API进行创建和消费消息。
7.列出消费者群组信息和消费者群组详细信息:
./kafka-consumer-groups.sh --bootstrap-server 192.168.3.211:9092,192.168.3.211:9093,192.168.3.211:9094 --describe --group 群组名;
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 群组名
这两个命令仅支持Linux环境下使用。消费者群组的概念,我们在讲解消费者知识时,再进行解释。
kafka最重要的配置文件是config文件夹下的server.properties文件。我们看里面的配置内容(不同版本的kafka配置内容略有不同,我们此处以kafka_2.11-2.3.0版本为例):
在集群环境下,每个broker(节点)的集群编号。
这个我们简单理解为配置kafka服务地址和ip即可。 如:PLAINTEXT://127.0.0.1:9092
zookeeper 集群的地址,可以是多个,多个之间用逗号分割。(一组 hostname:port/path 列表,hostname 是 zk 的机器名或 IP、port 是 zk 的端口、/path 是可选 zk 的路径,如果不指定,默认使用根路径)
上面我们提到,分区就是存放消息的文件。这个属性就是在设置文件的路径。可以设置多个路径,中间用逗号隔开。如果设置了多个路径,则kafka会以"最少使用原则",将不同分区的数据写入不同的文件路径下。但是同一个分区,会保证写在同一个文件路径下。
"最少使用原则":加入我们定义log.dirs的路径有两个,主题的分区也有两个。那么,当第一个分区有数据时,kafka会把数据写入第一个log.dirs写的路径,当第二个分区有数据时,因为第一个路径已经有文件写入了,但是第二个路径没有写入文件,那么此时,第二个路径就是使用"最少"的了。这时,kafka就会把第二个分区的数据写入第二个路径下。这就是所谓的"最少使用原则"。一句话概括就是:哪里用的少,就用哪里。
每数据目录用于日志恢复启动和关闭时的线程数量。因为这些线程只是服务器启动(正常启动和崩溃后重启)和关闭时会用到。所以完全可以设置 大量的线程来达到并行操作的目的。注意,这个参数指的是每个日志目录的线程数,比如本参数设置为 8,而 log.dirs 设置为了三个路径,则总共会启动 24 个线程。
是否允许自动创建主题。如果设为 true,那么 produce(生产者往主题写消息),consume(消费者从主题读消息)或者 fetch metadata(任意客户端 向主题发送元数据请求时)一个不存在的主题时,就会自动创建。缺省为 true。
我们上面讲到,在kafka的服务端命令里,也可以创建消费者,由此可知,在服务端命令和客户端生产者消费者里,都可以新建主题。
删除主题配置,默认未开启。
每个新建主题的分区个数(分区个数只能增加,不能减少 )。这个参数一般要评估。
日志保存时间,默认为 7 天(168 小时)。超过这个时间会清理数据。bytes 和 minutes 无论哪个先达到都会触发。与此类似还有 log.retention.minutes 和 log.retention.ms,都设置的话,优先使用具有最小值的那个。(提示:时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。也就 是最后修改时间是指日志片段的关闭时间,也就是文件里最后一个消息的时间戳)。下面我们会单独讲解kafka的日志过期删除机制。
topic 每个分区的最大文件大小,一个 topic 的大小限制 = 分区数*log.retention.bytes。-1 没有大小限制。log.retention.bytes 和 log.retention.minutes 任意一个达到要求,都会执行删除。(注意如果是 log.retention.bytes 先达到了,则是删除多出来的部分数据),一般不推荐使用最大文件删除策略,而是推 荐使用文件过期删除策略。在下面日志过期删除机制中进行详解
分区的日志存放在某个目录下诸多文件中,这些文件将分区的日志切分成一段一段的,我们称为日志片段。这个属性就是每个文件的最大尺寸;当 尺寸达到这个数值时,就会关闭当前文件,并创建新文件。被关闭的文件就开始等待过期。默认为 1G。 如果一个主题每天只接受 100MB 的消息,那么根据默认设置,需要 10 天才能填满一个文件。而且因为日志片段在关闭之前,消息是不会过期的,所 以如果 log.retention.hours 保持默认值的话,那么这个日志片段需要 17 天才过期。因为关闭日志片段需要 10 天,等待过期又需要 7 天。
由此可知,在一个分区中,并不是只有一个文件,而是根据此参数,切分成了多个文件。过期删除文件时,也是一个文件一个文件的删除,而不是一个分区中所有的文件全部删除。
表示一个服务器能够接收处理的单个消息的最大字节数,注意这个值 producer 和 consumer 必须设置一致,且不要大于 fetch.message.max.bytes 属性的值 (消费者能读取的最大消息,这个值应该大于或等于 message.max.bytes)。该值默认是 1000000 字节,大概 900KB~1MB。如果启动压缩,判断压缩后的值。 这个值的大小对性能影响很大,值越大,网络和 IO 的时间越长,还会增加磁盘写入的大小。 Kafka 设计的初衷是迅速处理短小的消息,一般 10K 大小的消息吞吐性能最好(LinkedIn 的 kafka 性能测试)
我们提到对服务器的要求,一般从CPU、内存、磁盘容量、磁盘IO等方面进行考虑。下面我们看kafka对这几个方面的要求。
磁盘容量/磁盘吞吐量
存盘容量根据我们的业务需求来定。比如,我们每天收到1TB的数据,我们要保留7天,那么久需要7TB的容量。
磁盘吞吐量这个东西不是很好评估,总之,如果机械硬盘觉得慢的话,就换成固态硬盘。
内存
kafka总体来说对内存的要求不是太高。消费者消费消息时,是从页面缓存中读取数据的,这里涉及到了内存。一般服务器的内存,都可以满足kafka的内存需求。不过服务器上不要装太多的其他服务,最好为kafka单独配一台服务器。
网络
kafka对网络的要求较高,kafka用来处理大数据量,所以,网络带宽高了网络IO才会快。同时,集群中数据的复制也需要占用网络。
CPU
kafka对CPU要求不是太高,一般服务器的CPU足以满足。
在上面的配置文件中我们讲到,log.dirs配置分区日志文件的路径。那我们就看一下分区文件中,都有哪些内容。这里需要强调的是,kafka服务中logs文件夹下的日志,是服务自身的日志,和分区日志不是一回事儿。
日志中有我们的"主题-分区编号"文件夹,我们创建了8个分区,所以这里有8个文件夹。这些文件夹里存放的文件如下:
第一个.index文件是kafka自身维护的一个索引文件,根据索引文件快速查询分区中的消息。手动删除索引文件后kafka会再生成一个。这里我们不用关注这个文件。
.log文件存储了消息。.log文件根据server.properties中配置的log.segment.bytes属性,会划分成多个.log文件。这里,我们将log.segment.bytes设置成140,然后创建生产者发送消息,可以看到,生成的日志文件如下:
可以看到,生成了多个log文件和index文件。其中,正在被写入的日志片段,我们称之为活跃片段。
需要注意的是,log.segment.bytes的值最小是14(不同版本的kafka可能这个值不同)。如果这个值设置过小,当生产者生产消息时,服务端会报The request included message batch larger than the configured segment size on the server这种错误。
.tmeindex是时间戳文件,kafka自身维护的文件。我们无需关注这些文件。
在日志文件中,除了存储消息的log,还有记录偏移量的文件,如下图:
由此可知,分区的日志文件中,包括分区日志和偏移量记录两大内容。有关偏移量的概念,我们在消费者中进行讲解。
kafka的过期机制是分区日志的保留时间和分区日志文件的大小共同决定的。在server.properties中,log.retention.hours设置了日志保留的时间,当一个日志片段从最后一次修改时间到现在的时间超过设置值时,该日志片段文件就会删除。这里需要注意的是,最后一次写入时间,而不是日志文件创建的时间。也就是说,日志片段到达规定的文件大小后,就创建新的日志片段写文件,写满的日志片段在过了时间后,就自动清除了。
server.properties中设置的log.retention.bytes是分区中文件大小的限制。超过这个设置值,分区中的数据就会被删除,不管设置的过期时间是否过期,都会被删除。所以说,kafka是受这两方面影响的。
一般的,我们在实际应用中,不建议使用超过多少大小就删除这种机制,这种操作是很危险的,很可能将我们还想要的数据删除了。一般我们就使用设置时间过期的参数,同时配合log.segment.bytes参数,将日志划分片段,来进行fakfa的删除策略。这样,写满的日志片段在过期后进行删除,我们可以在过期之前确保数据都已经没用了。
上面介绍的是设置全局的主题过期时间,如果想单独设置某个主题的过期时间,需要在管理端通过命令进行设置,命令如下:
./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name 主题名称 --entity-type topics --add-config retention.ms=86400000
查看是否生效:
./kafka-config --zookeeper localhsot:2181 --describle --entity-name 主题 --entity_type topics
删除单个主题的过期日期命令:
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 主题 --config cleanup.policy=delete
设置某个主题的过期数据为立刻删除:
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 主题 --config cleanup.policy=delete
补充:日志的过期策略有两种:一种是delete,删除,此为默认方式。一种是Compaction,压缩,将key值一样的数据,value压缩成最后的那一条。在全局配置中,通过log.cleanup.policy属性配置
分区数在生产环境5个左右。不超过10个。具体确定个数,需要我们结合实际场景。分区的作用就是提高了消息读写的吞吐量,所以,确定分区的数量,要按照吞吐量要求来确定。比如,我们需要某个主题的吞吐量是1G/s。而单个分区的吞吐量是100MB/s。那么,我们就需要设置10个分区,来满足其吞吐量。
kafka中顺序性是指生产者发送消息的先后顺序与kafka服务端消息先后顺序的一致性。发往每个分区中的消息,顺序性是有保障的,但是消息发往了不同的分区,则顺序就无法保障了。所以,对于一些顺序性很强的业务场景,我们有两种方式来保障顺序性:一种是一个主题只设置一个分区,这样的话,主题的消息全发往了这个分区,是按序的,避免了消息发往多个分区而造成的无序性。但是这么做大大影响了kafka的效率。第二种方式是将顺序性强的数据,通过生产者设置key值的方式,放入一个分区中,这样也是只在一个分区做保证的方法。(key值等知识在生产者中进行讲解)。
即使设置成了一个分区,也是有无序风险的,我们假设这种场景:两个生产者给kafka服务发送消息,第一个消费者先发送,但是这时网络出现了抖动,没有发送成功,kafka生产者会有重试机制,重新发送消息。假如在第一个生产者重试期间,第二个生产者发送消息成功了,那么第二个生产者的消息就选写入了kafka分区中,第一个
生产者重试发送消息成功,他的消息就后存入分区中,这样消息顺序也出现了错误。为了防止这种情况的出现,我们可以把server.properties文件中的max.in.flight.requests.per.connection设置为1,这样设置就是当一个生产者进行重试时,不会接收其他生产者发送的消息。这样就避免了上述情况的出现。但是max.in.flight.requests.per.connection设置为1严重影响了kafka的吞吐量,在顺序要求不高的场景中,我们不这么进行设置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。