赞
踩
Kafka是是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写,它是一种分布式的,高吞吐量(支持每秒数百万),基于发布/订阅的轻量级消息系统,它支持分区(partition)、多副本(replica),可基于zookeeper完成分布式的协调一致管理,可实时处理海量消费者在网站中的所有动作流数据。kafka用于构建实时数据管道和流应用程序,具有横向扩展,容错,wicked fast(变态快)等优点;Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。多用于处理海量数据,对实时性要求高的场景,例如:基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等。
Kafka官方地址: http://kafka.apache.org/
场景案例:
1>日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
2>消息系统:解耦和生产者和消费者、缓存消息等。
3>用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
4>运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。5 流式处理:比如spark streaming和stormKafka拓扑与流程.
主要设计目标:
1>提供消息持久化能力,并保证即使对TB级以上数据也能保证常数时间的访问性能;
2>高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输;
3>支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个partition内的消息顺序传输;
4>同时支持离线数据处理和实时数据处理;
产品特性:
1>完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式;
2>支持Hadoop 数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,又要求实时处理的场景,kafka是一个可行的解决方案;
3>Kafka通过Hadoop的并行 加载机制实现在线和离线消息的统一处理;
4>新版本kafka在Linux采用epoll机制实现设计底层网络库的ava的Select模型,因此跑在Linux上效率更高,因为epoll取消了轮询机制,换成了回调机制,当底层连接socket数较多时,可以避免CPU的时间浪费。
5>网络传输效率方面:kafka通过网络和磁盘进行数据传输时,Linux上它会调用sendFile系统调用,即零拷贝(Zero Copy 技术),避免了数据在内核地址空间和用户程序空间进行重复拷贝,从而降低了传输开销。
关于图中kafka的相关组件说明如下:
1>Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker(中间服务器);kafka将作为一个集群运行在一个或多个服务器上,kafka环境中所有已发布的消息保存在这组服务器中,也就是Kafka集群中。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),然后就可以从Broker上获取(消费到)数据,从而消费这些已发布的消息。可理解为货物的柜台,上面的“长格子(队列存储)”保存了producer生产的“鸡蛋”。
2>Topic:每条发布到Kafka集群的消息都要有一个类别,这个类别被称为topic,官方解释:kafka将消息种子(Feed)分门别类。每一类消息属于一个主题(Topic)。kafka集群存储的消息是以topic为类别记录的。 (物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处),也可通俗理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。实际topic标签实际就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。每条消息(也叫记录record)是由一个key,一个value和时间戳构成的。
3>Partition:是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件;Kafka 将 Topic 进行分区,分区可以并发读写。每个分区会映射到一个逻辑的日志(log)文件:每当一个message被发布到一个topic上的一个partition上,broke对应会将该message追加到这个逻辑log文件的最后一个segment上,这些segments 会被flush到磁盘上。一个Topic的多个分区分布在不同borker上,这些分区所在的broker总是处于2种状态的:leader和follower,lead来表明当前分区在哪个broker可供consumer/producer读写,Producer和Consumer只与这个Leader交互,其他followe只同步该leader-broker上的分区,做容灾备份使用,不负责读写,不对外提供服务;
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。这样通过ISR,kafka集群可维持的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。
4>Producer:负责发布消息到Kafka broker,它的角色是主题生产者(Kafka topic producer);也就是他是来生产“鸡蛋”的生产者;生产者往某个Topic上发布(push)消息。生产者也负责选择发布到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。producer生产消息需要如下参数:
topic:往哪个topic生产消息。
partition:往哪个partition生产消息。
key:根据该key将消息分区到不同partition。
message:消息。
5>Consumer:消费者,生出的“鸡蛋”它来消费,消费消息。每个consumer属于一个特定的消费者组consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group,消费者用一个消费者组名标记自己)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。
消费者消费消息时有2种模式:1)队列(queue)和2)发布-订阅式;
1)队列的处理方式是: 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。
2)发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息;当一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
如上图所示,kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高), 若干consumer group,以及一个Zookeeper集 群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从 broker订阅并消费消息。
● Push和pull模式:
作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
● Topic 和Partition
Topic在逻辑上可以被认为是一个个queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。 为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
上述架构中,也包含了:
生产者(Producer): 消息和数据生产者;
代理(Broker): 缓存代理,Kafka的核心功能;
消费者(Consumer): 消息和数据消费者;
以上这三个角色是kafka架构必备的。
Kafka给Producer和Consumer提供注册的接口,数据从Producer发送到Broker,Broker承担一个中间缓存和分发的作用,负责分发注册到系统中的Consumer。在实际应用生产中会产生各种不同类型的数据,为它们设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。
kafka为每个主题维护了分布式的分区(partition)日志文件,每个partition在kafka存储层面是append log。任何发布到此partition的消息都会被追加到log文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是我们的offset,offset是一个long型的数字,我们通过这个offset可以确定一条在该partition下的唯一消息。这样,在partition下实现了保证消息的有序性,但是在topic下没有有序性。由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。这些文件位于同一文件下,该文件夹的命名规则为:topic 名-分区号。
上图中,三个分区,生产者会决定发送到哪个Partition,消息每次到来会写到分区的尾部,并分配到一个offset;如果没有Key值则进行轮询发送。如果有Key值,对Key值进行Hash**,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区,如果想保证队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
kafka有四个核心API:
1>应用程序使用 Producer API 发布消息到1个或多个topic(主题)。
2>应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
3>应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
4>Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。
上图中,我们把消费者组看作为一个逻辑上的订阅者。每个组包含数目不等的消费者, 一个组内多个消费者可以用来扩展性能和容错。
Kafka高效的原因:
1>直接使用Linux文件系统的Cache来高效缓存数据
2>采用Linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下切换,采用Sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。可以提高60%的数据发送性能.
使用场景:
1)Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过kafka并没有提供JMS中的事务性消息传输担保(消息确认机制)消息分组等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2)Websit activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3)Log Aggregation
kafka的特性决定它非常适合作为日志收集中心;application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
消息到达kafka集群后,消费者订阅后会抄送给消费者消费,消费模型有两种:推送模型(psuh)和拉取模型(pull):
1)推送模型(push): 基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,而我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种更不可取,数据丢了确告知获取成功了。Push模式的优势在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和情况,容易导致producer压垮consumer。
2)拉取模型(pull):push方式中,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。Pull模式中,当服务端收到这条消息后什么也不做,只是等着 Consumer 主动到自己这里来读,即 Consumer 这里有一个“拉取”的动作。Kafka采取拉取模型(poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。
3)场景化分析 Push 模型和 Pull 模型的利弊
场景1: Producer 速率大于 Consumer 速率
当 Producer 速率大于 Consumer 速率时,有两种可能性:一种是 Producer 本身的效率就要比 Consumer 高(例如,Consumer 端处理消息的业务逻辑可能很复杂,或者涉及到磁盘、网络等 I/O 操作);另一种是 Consumer 出现故障,导致短时间内无法消费或消费不畅。
Push 方式由于无法得知当前 Consumer 的状态,所以只要有数据产生,便会不断地进行推送,在以上两种情况下时,可能会导致 Consumer 的负载进一步加重,甚至是崩溃(例如生产者是 flume 疯狂抓日志,消费者是 HDFS+hadoop,处理效率跟不上)。除非Consumer 有合适的反馈机制能够让服务端知道自己的状况。
而采取 Pull 的方式问题就简单了许多,由于 Consumer 是主动到服务端拉取数据,此时只需要降低自己访问频率即可。举例:如前端是 flume 等日志收集业务,不断向 CMQ 生产消息,CMQ 向后端投递,后端业务如数据分析等业务,效率可能低于生产者。
场景2: 强调消息的实时性
采用 Push 的方式时,一旦消息到达,服务端即可马上将其推送给消费端,这种方式的实时性显然是非常好的;而采用 Pull 方式时,为了不给服务端造成压力(尤其是当数据量不足时,不停的轮询显得毫无意义),需要控制好自己轮询的间隔时间,但这必然会给实时性带来一定的影响。
场景3: Pull 的长轮询
Pull 模式存在的问题:由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次 Pull 取到消息了还可以继续去 Pull,如果没有 Pull 取到消息则需要等待一段时间再重新 Pull。
由于等待时间很难判定。可能有很多种动态拉取时间调整算法,可能依然会遇到问题,是否有消息到来不是由消费方决定。也许1分钟内连续到来3000条消息,接下来的几分钟或几个小时内却没有新消息产生。
比如腾讯的CMQ就 提供了长轮询的优化方法,用以平衡 Pull/Push 模型各自的缺点。基本方式是:消费者如果尝试拉取失败,不是直接 return,而是把连接挂在那里 wait,服务端如果有新的消息到来,把连接拉起,返回最新消息。
场景4: 部分或全部 Consumer 不在线
在消息系统中,Producer 和 Consumer 是完全解耦的,Producer 发送消息时,并不要求 Consumer 一定要在线,对于 Consumer 也是同样的道理,这也是消息通信区别于 RPC 通信的主要特点;但是对于 Consumer 不在线的情况,却有很多值得讨论的场景。springboot集成的rabbitmq封装的就是push模式;
首先,在 Consumer 偶然宕机或下线时,Producer 的生产是可以不受影响的,Consumer 上线后,可以继续之前的消费,此时消息数据不会丢失;但是如果 Consumer 长期宕机或是由于机器故障无法再次启动,就会出现问题,即服务端是否需要为 Consumer 保留数据,以及保留多久的数据等。
采用 Push 方式时,因为无法预知 Consumer 的宕机或下线是短暂的还是持久的,如果一直为该 Consumer 保留自宕机开始的所有历史消息,那么即便其他所有的 Consumer 都已经消费完成,数据也无法清理掉,随着时间的积累,队列的长度会越来越大,此时无论消息是暂存于内存还是持久化到磁盘上(采用 Push 模型的系统,一般都是将消息队列维护于内存中,以保证推送的性能和实时性),都将对 MQ 服务端造成巨大压力,甚至可能影响到其他 Consumer 的正常消费,尤其当消息的生产速率非常快时更是如此;但是如果不保留数据,那么等该 Consumer 再次起来时,则要面对丢失数据的问题。
折中的方案是:MQ 给数据设定一个超时时间,当 Consumer 宕机时间超过这个阈值时,则清理数据;但这个时间阈值也并不太容易确定。
在采用 Pull 模型时,情况会有所改善;MQ服务端不再关心 Consumer 的状态,而是采取“你来了我才服务”的方式,Consumer 是否能够及时消费数据,服务端不会做任何保证(也有超时清理时间)。
中间件 | Push模型 | pull模型 |
---|---|---|
RabbitMQ | 支持 | 支持 |
kafka | – | 支持 |
RocketMQ | 支持(逻辑支持/假支持, 还是pull+长轮询/broker端阻塞) | 支持 |
java_ZeroMQ | 支持(Response) | 支持(Request端调用connect方法) |
AMQ | 支持 | — |
activemq | 支持 | 支持 |
flume | 支持 | 支持 |
NetMQ | 支持 | 支持 |
1)Kafka-Client --单线程Selector:适用于并发链接数小,逻辑简单,数据量小的场景.
上图中,consumer和producer都使用单线程模式,但这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会造成线程阻塞,一旦出现后续请求就会无法处理,会造成大量请求超时,引起雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
2)Kafka–server – 多线程Selector:
如上图示,在kafka服务端采用多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操作的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。然后在写线程池中,取出这个请求,对其进行逻辑处理,即使某个请求线程阻塞了,还有后续的线程从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,由于注册了OP_WIRTE事件,所以还需要对其发送响应。
3)kafkaControl选举
control为利用zookeeper选举机制从brokers中选举出来的broker。
监听broker变化,通过监听Zookeeper中的 /brokers/ids/ 节点方式来实现
监听topic变化,通过监听Zookeeper中的 /brokers/topics 节点方式来实现,实时监听topic变化
管理topic、partition、broker相关的信息
更新数据的元数据信息,同步到其他的broker节点
选举:
broker控制器选举的原理是借助于zookeeper的临时节点实现: kafka集群启动时,每个broker都会尝试争当控制器,都会往zookeeper的controller节点注册自己,但是由于zookeerper的特性,如果节点已经创建过,再创建就会失败,所以只会有一个broker创建成功,那么创建成功的broker就会成为控制器;此外其他broker都会监听这个controller节点。
由于controller是临时节点,当控制器broker挂机之后,就会断开与zookeeper的会话连接,临时节点也会消失,其它节点监听到controller节点消失后,就会重新争取controller节点。
另外,当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法。过程如下:
- zk读取并增加 Controller Epoch。
- kafka在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
- 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
- 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
- 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
- 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
- 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
- 启动 replicaStateMachine 和 partitionStateMachine。
- 将 brokerState 状态设置为 RunningAsController。
- 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
- 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
- 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
Kafka集群中被选举为控制器的broker,负责管理整个集群中所有分区和副本的状态。其职责有:
监听partition变化
维护分区的优先副本均衡
更新集群元数据
启动并管理partition状态机和replica状态机
从zk中读取当前与topic、partition、broker相关的信息并管理
监听broker相关变化
监听topic相关变化
4)kafka 在 zookeeper 中的存储结构
5)producer 写入消息流程序列图
流程说明:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 新的 offset) 并向 producer 发送 ACK。
消费/拉消息流程:
6)Topic创建 和删除流程
创建流程说明:
- controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
- controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
1> 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。