赞
踩
包括Kafka的基本组成,Kafka的拓扑结构以及Kafka的内部通信协议。Kafka内部的通信协议是建立在Kafka的拓扑结构之上,而Kafka的拓扑结构是由Kafka的基本模块所组成的。
AK RELEASE 2.5.0
APRIL 15, 2020
Kafka集群中生产者将消息发送给以Topic命名的消息队列Queue中,消费者订阅发往以某个Topic命名的消息队列Queue中的消息。其中Kafka集群由若干个Broker组成,Topic由若干个Partition组成,每个Partition里的消息通过Offset来获取。
基本组成包括:
一个典型的Kafka集群中包含若干个Producer(可以是某个模块下发的Command,或者是Web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干个Broker(Kafka集群支持水平扩展,一般Broker数量越多,整个Kafka集群的吞吐率也就越高),若干个Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置。Producer使用Push模式将消息发布到Broker上,Consumer使用Pull模式从Broker上订阅并消费消息。
简单的消息发送流程如下:
Kafka内部各个Broker之间的角色并不是完全相等的,Broker内部负责管理分区和副本状态以及异常情况下分区的重新分片等这些功能的模块称为KafkaController。每个Kafka集群中有且只有一个Leader状态的KafkaController,当其出现异常时,其余Standby状态的KafkaController会通过Zookeeper选举出有一个Leader状态的KafkaController。
Broker内部存在的功能模块包括SocketServer、KafkaRequestHandlerPool、LogManager、ReplicaManager、OffsetManager、KafkaScheduler、KafkaApis、KafkaHealthcheck和TopicConfigManager九大基本模块以及KafkaController集群控制管理模块。
后台还会维护一个日志合并线程,Kafka发送消息的时候需要携带3个参数(Topic,Key,Message),针对相同的Key值不同的Message只保留最后一个Key值对应的消息内容。
每个Broker内部都会存在一个KafkaController模块,但是有且只有一个Broker内部的KafkaController模块对外提供控制管理Kafka集群的功能,例如负责Topic的创建、分区的重分配以及分区副本Leader的重新选举等。
Leader和Follower的选举是基于Zookeeper实现的,尝试在Zookeeper的相同路径上创建瞬时节点(Ephemeral Node),只有一个KafkaController会创建成功。其中负责状态管理的类为ZookeeperLeaderElector,字面意思上就可以看出是基于Zookeeper的Leader选举权。其中包含了controllerContext当前Topic的元数据信息以及集群的元数据信息等;electionPath为多个KafkaController竞争写的路径,其值为/controller;onBecomingLeader为状态转化成Leader时候的回调函数;onResigningAsLeader为状态转化位Follower时候的回调函数;brokerId为当前Broker Server的Id。ZookeeperLeaderElector启动后负责观察数据节点状态,瞬时节点消失触发再次选举,尝试写入的节点内容就是brokerId。
当选举为Leader时分为下面几步:
选举为Follower的时候分为下面几步,正好与Controller相反:
Topic的分区状态维护是由PartitionStateMachine模块负责的,通过在/brokers/topics 和 /admin/delete_topics目录上注册不同的监听函数,监听Topic的创建和删除事件触发Topic分区状态的转换。
PartitionStateMachine中分区状态由PartitionState用一个字节表示不同状态,分为四种:
每个状态都是由一个合理的前置状态转换而来。
Topic分区的Leader Replica在不同场景下的选举策略是不一样的,不同选举策略都基础PartitionLeaderSelector。其根据Topic、Partition、当前Leader、当前的ISR选举出新的Leader,新的ISR和新的AR(在线状态),共有5种不同的策略:
Topic分区的副本状态维护是由ReplicaStateMachine模块负责的,Topic分区的副本状态伴随着Topic分区状态的变化而变化
分区副本状态只要有7种:
目标状态也是由合理的前置状态转换而来的。
KafkaController内部通过监听函数来维护集群的元数据。
Partition的AR列表的第一个Replica称为“Preferred Replica”,并均匀分布在整个Kafka集群中。由于每个Partition只有Leader Replica对外提供读写服务,并且Partition创建的时候默认的Leader Replica位于Preferred Replica之上,此时Kafka集群的负载是均衡的,如果Kafka集群长时间运行,Broker Server中途由于异常而发生重启,此时Partition的Leader Replica会发生迁移,这样会导致其Partition的Leader Replica在集群中不再均衡了。
Topic是由Partition组成的,而Partition是由Replica组成的,因此只有Partition的Assigned Replica全部被删除了该Partition才可以被删除;只有Topic的所有Partition都被删除了,该Topic才可以最终真正的被删除。
ControllerChannelManager提供了Leader状态的KafkaController和集群其他Broker Server通信的功能,内部针对每一个在线的Broker Server会维护一个通信链路,并分别通过各自的RequestSendThread线程将请求发送给对应的Broker Server。
kafka-topics.sh提供了Topic的创建、修改、列举、描述、删除功能,在内部时通过TopicCommand类来实现的。
kafka-reassign-partitions.sh提供来重新分配分区副本的能力。该工具可以促进Kafka集群的负载均衡。因为Follower Replica需要从Leader Replica Fetch数据以保持与与Leader Replica同步,仅保持Leader Replica分布的平衡对整个集群的负载均衡时不够的。另外当Kafka集群扩容后,该工具可以将已有Topic的Partition迁移到新加入的Broker上。
分区重分片是一个异步的流程,因此该脚本还提供了查看当前分区重分配进度的指令。
kafka-preferred-replica-election.sh用于在整个集群中恢复Leader Replica为Preferred Replica。
生产者是指消息的生成者。生产者可以通过特定的分区函数决定消息路由到Topic的某个分区。消息的生产者发送消息有两种模式,分别为同步模式和异步模式。
kafka.javaapi.producer.Producer#send方法发送
指定 metadata.broker.list 属性,配置Broker地址
指定 partitioner.class 属性,配置分区函数,分区函数决定路由。分区函数必须实现 kafka.producer.Partitioner的 partition接口,参数为消息key值,分区总数,返回值为分区的索引。
Producer内部包括以下几个主要模块:
生产者由两种发送模式:同步和异步
当producer.type配置为sync时,同步发送消息。
当producer.type配置为async时,异步发送消息。
Kafka提供了两种不同的方式来获取消息:简单消费者和高级消费者。简单消费者获取消息时,用户需要知道待消费的消息位于哪个Topic的哪个分区,并且该目的分区的Leader Replica位于哪个Broker Server上;高级消费者获取消息时,只需要指定待消费的消息属于哪个Topic即可。
简单消费者提供的客户端API称为低级API,本质上客户端获取消息最终时利用FetchRequest请求从目的端Broker Server拉取消息。
FetchRequest请求中可以指定Topic的名称,Topic的分区,起始偏移量、最大字节数。
客户端无论生产消息还是消费消息,最终都是通过与目的地端Broker Server建立通信链路,并且以阻塞模式允许,然后通过该条链路将不同的请求发送出去。
高级消费者以Consumer Group(消费组)的形式来管理消息的消费,以Stream(流)的形式来提供具体消息的读取。Stream是指来自若干个Broker Server上的若干个Partition的消息。客户端需要正确设置Stream的个数,并且应该针对每个Stream开启一个线程进行消息的消费。一个Stream代表了多个Partition消息的聚合,但是每一个Partition只能映射到一个Stream。
消息的最终获取是通过遍历KafkaStream的迭代器ConsumerIterator来逐条获取的,其数据来源于分配给该KafkaStream的阻塞消息队列BlockingQueue,而BlockingQueue的数据来源针对每个Broker Server的FetchThread线程。FetchThread线程会将Broker Server上的部分Partition数据发送给对应的阻塞消息队列BlockingQueue,而KafkaStream正是从该阻塞消息队列BlockingQueue中不断的消费消息。
ConsumerThread本质上是客户端的消费线程,消费若干个Partition上的数据,并且与BlockingQueueu相互映射,只要确定了ConsumerThread和Partition之间的关系,也就确定了BlockingQueue和Partition之间的关系。Kafka提供了两种ConsumerThread和Partition的分配算法Range(范围分区分配)和RoundRobin(循环分区分配)
高级消费者中,每个具体消费者实例启动之后会在/consumers/[group]/ids/的Zookeeper目录下注册自己的id;Kafka集群内部Topic会在/brokers/topics/[topic]/的Zookeeper目录下注册自己的Partition,因此消费者实例一旦发现以上2个路径的数据发生变化时,则会触发高级消费者的负载均衡流程,除此之外,消费者实例一旦和Zookeeper的链接重新建立时也会触发高级消费者的负载均衡流程。
高级消费者内部针对Zookeeper的连接建立、Topic的Partition变化、Consumer的新增会建立3个不同的Listener,分别是ZKSessionExpireListener、ZKTopicPartitionChangeListener和ZKRebalancerListener。
高级消费者消费消息时提供了两种持久化偏移量的机制,由参数auto.commit.enable,默认为true自动提交。否则需要手动调用ZookeeperConsumerConnector的commitOffsets。Kafka根据参数offsets.storage,默认为zookeeper(保存路径为/consumers/[group]/offset/[topic]/[partition]),可以设置为kafka(保存再Topic为“__consumer_offsets”的日志中)。高级消费者内部会自动间隔一定时间(由参数 auto.commit.interval.ms决定,默认60*1000ms)
一、顺序读写磁盘,充分利用了操作系统的预读机制。
二、linux中使用sendfile命令,减少一次数据拷贝,如下。
①把数据从硬盘读取到内核中的页缓存。
②把数据从内核中读取到用户空间。(sendfile命令将跳过此步骤)
③把用户空间中的数据写到socket缓冲区中。
④操作系统将数据从socket缓冲区中复制到网卡缓冲区,以便将数据经网络发出
三、生产者客户端缓存消息批量发送,消费者批量从broker获取消息,减少网络io次数,充分利用磁盘顺序读写的性能。
四、通常情况下kafka的瓶颈不是cpu或者磁盘,而是网络带宽,所以生产者可以对数据进行压缩。
消息丢失解决方案:
首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功
消息重复解决方案:
消息可以使用唯一id标识
生产者(ack=all 代表至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
事务(transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚流处理EOS:流处理本质上可看成是“读取-处理-写入”的管道。此EOS保证整个过程的操作是原子性。注意,这只适用于Kafka Streams
一致性定义:若某条消息对client可见,那么即使Leader挂了,在新Leader上数据依然可以被读到
HW-HighWaterMark: client可以从Leader读到的最大msg offset,即对外可见的最大offset, HW=max(replica.offset)
对于Leader新收到的msg,client不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被client消费,这样就保证了如果Leader fail,该消息仍然可以从新选举的Leader中获取。
对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
-1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
spark RDD内部机制可以保证数据at-least语义。
Receiver方式
开启WAL(预写日志),将从kafka中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
Direct方式
依靠checkpoint机制来保证。
要保证数据不重复,即Exactly once语义。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。