赞
踩
目录
官网:Apache KafkaApache Kafka: A Distributed Streaming Platform.https://kafka.apache.org/
①、Kafka是由LinkedIn开发的一个分布式的消息系统,底层使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。
②、Kafka是一个分布式数据流平台,可以从下面几个层面来理解:
1、我们可以向Kafka发布数据以及从Kafka订阅数据,即我们可以将kafka看为一个消息队列,所以启动的作用:缓冲(消峰限流),实现生产与消费的解耦。
2、Kafka可以存储数据,并提供容错机制,即数据丢失后可以进行恢复
3、当数据到达Kafka之后,可以马上被消费处理,即Kafka的延迟很低
综合来说,Kafka具备上面三个明显特点,根据Kafka的这个特点,Kafka的适用场景是:搭建实时流平台的中间件:
在实际工作中,对数据的处理方式有两种:
1、离线批处理
2、实时流处理
③、Kafka是一种分布式的,基于发布/订阅的消息系统,能够高效并实时的吞吐数据,以及通过分布式集群及数据复制冗余机制(副本冗余机制)实现数据的安全 。
①、在software目录下上传或下载Kafka并解压
cd /home/software/
rz 版本: kafka_2.10-0.10.0.1.tgz解压
tar -xvf kafka_2.10-0.10.0.1.tgz
②、配置文件
进入Kafka的confit目录,编辑server.properties文件:
broker.id=1
log.dirs=/home/software/kafka_2.10-0.10.0.1/kafka-logs
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
③、通过远程拷贝,将01的kafka发送给hadoop02与03,更改broker.id即可
进入software目录
cd /home/software/
拷贝:
scp -r kafka_2.10-0.10.0.1 hadoop02:/home/software/
scp -r kafka_2.10-0.10.0.1 hadoop03:/home/software/
进入到Hadoop02主机:cd /home/software/kafka_2.10-0.10.0.1/config/
vim server.properties
进入到hadoop03主机:
cd /home/software/kafka_2.10-0.10.0.1/config/
vim server.properties
④、启动zookeeper集群:
进入zookeeper的bin目录,执行:
cd /home/software/zookeeper-3.5.7/bin/
./zkServer.sh start./zkServer.sh status 查看状态
⑤、启动Kafka集群
进入kafka的bin目录,执行:三台都要启动
cd /home/software/kafka_2.10-0.10.0.1/bin/
./kafka-server-start.sh ../config/server.properties退出:ctrl+c
①、producer:生产者,可以是一个测试线程,也可以是某种技术框架(比如flume)
②、producer向kafka生产数据,必须指定向哪个主题去生产数据
③、主题:topic,主题是由用户自己来创建的
④、创建一个主题,需要指定:
Ⅰ、主题名
Ⅱ、主题分区数量
Ⅲ、分区副本数量
⑤、主题的分区:本质上就是一个分区文件目录,三台主机总会有一个含有。
可以进入查看:cd /home/software/kafka_2.10-0.10.0.1/kafka-logs/
分区目录的命名规则:主题名-分区编号(分区编号从0开始)
思考:kafka主题引入分区机制的作用是什么?
可以分布式的对一个主题的数据进行存储和管理,比如:发送obc数据,给到0,然后1,然后2,然后再012。
补充:
主题的分区数量可以远大于kakfa broker服务器数量,kafka底层尽可能确保分区目录的负载均衡,比如:一个主题由10个分区,有3个broker服务器,则分区目录的数量分配:3-3-4
⑥、producer向kafka指定的主题生产数据,数据最终是存到了分区目录下的log文件中,此外kafka底层会确保每个分区目录的数据达到负载均衡的效果(轮询发送给每个分区的目录)
⑦、Kafka支持数据的容错机制,即分区数据丢失后,可以恢复,通过副本冗余机制来实现,即我们在创建主题时,可以指定每个分区有多个副本
补充:如果出现创建主题分区异常,主要检查zookeeper状态,如果报错:Error Start,查看zookeeper.out文件
⑧、分区的副本机制,是为了数据容错。
sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 1 --topic frbook
效果为三台主机都有frbook-0
补充:
Ⅰ、分区副本数量不能大于broker服务器数量
Ⅱ、分区的副本数量不宜过多,副本数量越多,集群磁盘的利用率越低,比如3副本,集群磁盘利用率:33%
在实际生产环境下,一般3个副本足够了,2个副本也可以,如果1个则没有容错机制,所以一般为2个或者3个,
综上,kafka最基础和核心的概念就是topic主题,因为无论是向kafka生产数据,还是从kafka消费数据,都需要指定主题。
主题topic属性:
主题名 topic name
分区数量 partition
分区的副本数量 replication
⑨、从kafka 消费数据,消费者可以是一个线程,也可以是某种技术框架(Spark,Flink)
⑩、kafka的特点:kafka的数据,无论消费与否,会一直存在,不会删除
在调用kafka相关指令时,如果涉及到zookeeper的,写一台即可,如果涉及kafka的,有几个就要写几个。
①、创建自定义的topic
在bin目录下执行:
cd /home/software/kafka_2.10-0.10.0.1/bin/
sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic enbook
注:副本数量要小于等于节点数量
②、查看所有的topic
执行:sh kafka-topics.sh --list --zookeeper hadoop01:2181
③、启动producer:生产者线程, hadoop01:9092,有几台kafka写几个
执行:sh kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic enbook
启动后输入值
然后在第三台主机查看:
cd enbook-0/
vim 00000000000000000000.log
④、启动consumer
进入:cd /home/software/kafka_2.10-0.10.0.1/bin/
执行:sh kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic enbook --from-beginning
⑤、可以通过producer和consumer模拟消息的发送和接收
生产:输入hello java
消费:这边就可以拿到hello java
⑥、删除topic指令:
进入bin目录,执行:sh kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic enbook
可以通过配置 config目录下的 server.properties文件,加入如下的配置:
上图可以认为:
主题名:frbook
分区数:2个
分区副本:3个
知识点:
1、kafka是有Leader和Follower概念的,注意:是针对分区的副本而言,不是针对broker来说的,kafka集群,borker没有主从之分
2、分区的副本Leader的作用:无论是生产数据还是消费数据,都是和分区副本的Leader交互的
3、如果分区副本的Leader挂掉了,Kafka会从剩余的Follower中选出新的Leader
知识点:
1.producer:消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:kafka 集群中包含的服务器。broker (经纪人,消费转发服务)之间不分主从
3.topic:主题,每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
主题名
主题的分区数,分区本质上是一个分区目录,分区机制的作用可以分布式的存储和管理主题数据
分区的副本数量,副本机制是可以实现数据的容错,副本分为Leader和Follower,生产和消费底层都是直接和分区副本的Leader交互。
4.partition:partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
5.consumer:消费者,从 kafka 集群中消费消息的终端或服务。
6.Consumer group:消费者组,high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
组间共享消费,组内竞争消费。
在创建主题时,分区数量可以多一些,避免消费者组内的一些线程闲置浪费。
7.replica:partition 的副本,保障 partition 的高可用。副本数量不宜过多,因为降低进群磁盘的利用率。比如3副本,磁盘利用率1/3.
8.leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9.follower:replica 中的一个角色,从 leader 中复制数据。
10.controller:控制进程,kafka 集群中的其中一个服务器,用来进行 leader 的选举以及失败恢复,Controller进程的信息是由Zookeeper来维护的,如果宕机,会在剩下的broker服务器启动
11.zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息(主题名,主题分区数,分区副本数,副本leader的位置,Controller位置等)和监控kafka集群的运行(临时节点+监听机制)。
上图所示,向一个主题生产数据,数据最终是存储到各个分区中。
分区从逻辑上来看,实际上是一个队列
分区从物理上来看,实际上是一个分区目录
向分区中存储数据,最终是存到的分区目录下的log文件中
底层实际上是数据磁盘的顺序写操作(往文件末尾追加),所以kakfa的写入性能较高
上图所示,从分区消费数据,kafka底层有一个offset机制,kafka会记录消费者的offset(消费者位置偏移量),便于下一次从正确的位置进行消费。
Kafka记录offset两个版本:
旧版本:kafka是将offset存到zookeeper上的,存在的问题:会频繁的和Zookeeper进行通信交互,极可能会为Zookeeper带来较高的访问负载。
新版本:kafka自己来管理offset,以消费者组为单位进行管理
验证思路:
1、启动一个消费者线程(需要带有消费者属性)
sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic enbook --from-beginning --new-consumer
2、会发现多出一些目录
cd /home/software/kafka_2.10-0.10.0.1/kafka-logs/
3、即kafka底层是通过一个主题来进行管理的,主题名是__consumer_offsets,分区是50个
4、某个消费者组的offset就存在这50个分区目录中的其中一个存储位置,规则:groupID.hashcode%50=取余结果就是对应的分区目录
查看groupID:
sh kafka-consumer-groups.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --list --new-consumer
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(主题)。
Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.
Topic在逻辑上可以被认为是一个queue,每条消息都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,如下图所示。
因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
流程:
1、producer要向kafka产生数据,需要先通过Zookeeper获取副本Leader的位置信息
2、Producer向Leader发送数据
3、Leader收到数据后,将数据写入到分区目录下的log文件
4、Follower从Leader同步数据,将数据写入到分区目录下的log文件中,如果同步成功(将数据写入log文件成功),则向Leader返回ACK(确认机制)
细节:
Kafka引入一个ISR机制(概念),在Follower和Leader数据同步的过程中,比如:
①、副本-Follower ②-副本-Leader ③副本-Follower
在数据同步过程中,①②同步,③出现故障没有跟上,此时①②是同一组ISR,③不是,如果后续Leader挂掉了,则kafka会从Leader的ISR组中随机选择一个Follower变为Leader。
Kafka底层有一个同步超时的时间(10s),即一个Follower在超时时间内没有反馈ACK,则认为同步失败。
由写入流程可知ISR里面的所有replica都跟上了leader,只有ISR里面的成员才能选为leader。对于f+1个replica,一个partition可以在容忍f个replica失效的情况下保证消息不丢失。
比如一个分区有5个副本,挂了4个,剩下一个副本,依然可以工作。
注意:kakfa的选举不同于zookeeper,用的不是过半选举。
当所有 replica 都不工作时,有两种可行的方案:
1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
kafka 0.8.* 使用第二种方式。此外, kafka 通过 Controller 来选举 leader。
5、Leader向Producer返回ACK
①、数据文件的分段与索引,Kafka 为了提高数据的读取速率,引入了索引机制,为分区目录下的log文件会创建一个对应的index文件
②、Kafka解决查询效率的手段之一是将数据文件分段,可以配置每个数据文件的最大值,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。
③、每个log文件默认是1GB生成一个新的Log文件,比如新的log文件中第一条的消息的offset 16933,则此log文件的命名为:000000000000000016933.log,此外,每生成一个log文件,就会生成一个对应的索引(index)文件。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
④、数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引——Offset与position(Message在数据文件中的绝对位置)的对应关系。
稀疏索引+二分查找,可以加快查找速度
⑤、index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
索引文件被映射到内存中,所以查找的速度还是很快的。
在一个分布式发布订阅消息系统中,组成系统的计算机总会由于各自的故障而不能工作。在Kafka中,一个单独的broker,可能会在生产者发送消息到一个topic的时候宕机,或者出现网络故障,从而导致生产者发送消息失败。根据生产者如何处理这样的失败,产生了不通的语义:
①、至多一次语义(At most once semantics):
如果生产者在ack超时或者返回错误的时候不重试发送消息,那么消息有可能最终并没有写入Kafka topic中,因此也就不会被消费者消费到。但是为了避免重复处理的可能性,我们接受有些消息可能被遗漏处理。
②、至少一次语义(At least once semantics):
如果生产者收到了Kafka broker的确认(acknowledgement,ack),并且生产者的acks配置项设置为all(或-1),这就意味着消息已经被精确一次写入Kafka topic了。然而,如果生产者接收ack超时或者收到了错误,它就会认为消息没有写入Kafka topic而尝试重新发送消息。如果broker恰好在消息已经成功写入Kafka topic后,发送ack前,出了故障,生产者的重试机制就会导致这条消息被写入Kafka两次,从而导致同样的消息会被消费者消费不止一次。每个人都喜欢一个兴高采烈的给予者,但是这种方式会导致重复的工作和错误的结果。
③、精确一次语义(Exactly once semantics):
即使生产者重试发送消息,也只会让消息被发送给消费者一次。精确一次语义是最令人满意的保证,但也是最难理解的。因为它需要消息系统本身和生产消息的应用程序还有消费消息的应用程序一起合作。比如,在成功消费一条消息后,你又把消费的offset重置到之前的某个offset位置,那么你将收到从那个offset到最新的offset之间的所有消息。这解释了为什么消息系统和客户端程序必须合作来保证精确一次语义。
综上,这三层语义,没有好坏之分,主要看具体的应用场景
比如:允许数据丢失但追求高性能,使用at least noce
对数据精度要求非常高,一条不能丢失并且结果严格准确,使用exactly once
很多框架默认是第二种(折中的),如何降低数据重复处理的可能性呢?可以适当调多超时时间,尤其是网络环境不好的时候。
在kafka中,可以通过配置文件来进行设定:
在server.properties文件:
至多一次
acks=0
至少一次
acks=1 只接受副本Leader的ACK 常用的
acks=2 接受副本Leader的ACK和一个Folloewer的ACK
acks=3 接受副本Leader的ACK和两个Follower的ACK
精确一次
acks=1
enable.idempotence=true
一个幂等性的操作就是一种被执行多次造成的影响和只执行一次造成的影响一样的操作。现在生产者发送的操作是幂等的了。如果出现导致生产者重试的错误,同样的消息,仍由同样的生产者发送多次,将只被写到kafka broker的日志中一次。对于单个分区,幂等生产者不会因为生产者或broker故障而发送多条重复消息。想要开启这个特性,获得每个分区内的精确一次语义,也就是说没有重复,没有丢失,并且有序的语义,只需要设置producer配置中的”enable.idempotence=true”。
零拷贝技术
这个很简单的网络文件的输出过程,在OS底层,会发现数据被拷贝4次。
内核态可以理解为特权态,可以访问计算机的所有资源,而用户态的访问资源是受限的。
上图中,如果要对文件数据修改,则只能在用户态的缓冲区修改,所以需要拷贝4次。
但如果仅仅是发送文件数据,则拷贝4次是没有意义的,并且还是产生4次内核态和用户态的切换,这些都需要耗费cpu的性能。
使用Zero Copy技术示意图:
小结:
Kafka的写入性能高:因为底层是磁盘顺序写;
Kafka的读取性能高:因为底层是由索引机制;
Kafka的传输性能高:因为底层使用Zero Copy技术;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。