当前位置:   article > 正文

总结:kafka_kafka总结

kafka总结

一、Kafka整体架构

Apache Kafka 是分布式发布-订阅消息系统。

下面这张架构图可能更简单清晰点:

从代码层面,我们的生产者和消费者不需要和zookeeper建立连接,因为Kafka Server端在启动的时候会默认和Zookeeper建立连接

场景选择

主要是从业务的角度出发,生产端与消费端进行合理的配置来适用不同的业务场景。

Kafka不太适用到实时线上业务中。

由于Kafka很难避免数据丢失以及不进行消息重复性检查,所以Kafka适合对消费重复数据有幂等性以及异常情况下数据丢失的场景,当然正常情况下不会有数据的丢失。如果不接受以上场景可以考虑ActiveMQ,RocketMQ。

同时,kafka 一般用于日志收集的离线系统,对于实时性要求高的业务可以考虑ActiveMQ或者使用kafka服务时,使用流式计算进行及时消费。

二、kafka中的概念

  • Producer

消息生产者,负责发布消息到Topic

  • Topic

消息主题,类似于队列概念,一个Topic可以有多个Partition,不同的Partition分布在不同的Broker上。每个Partition有多个replica(默认3个,1个leader 副本,2个follower 副本),每个Leader Partition都有它自己独立的ISR,ISR负责去管理replica。

  • Broker

    Kafka集群中的一台或多台服务器统称broker。 Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。

  • Consumer

消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。Consumer Grpup 消费全量的消息,每个Consumer消费其中一部分partitionPartition

  • 是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。每个Partion是存储有序的消息。Partion中每条消息都会被分配一个 有序的Id(offset),同一 topic 下的不同分区包含的消息是不同的。一个消费者消费一个Partion的数据是有序的,但是如果topic分配的partion数量大于1的时候,消费者去消费topic消息的时候就不能保证有序了。

  • Replica

    partition 的副本,保障 partition 的高可用, 理论上replica值越大,partition 高可用率越高,消息冗余越高。但同时会降低整体吞吐。默认为3个副本。1个leader 副本,2个follower 副本

  • Lead replica

    每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地。

  • ISR

    In-Sync Replica,是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Leader Partition都有它自己独立的ISR

  • Offset

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message

  • Log Segment

    partition是分段的,每个段叫Log Segment,包括了一个数据文件和一个索引文件。如下所示,是一个partition目录下的log segment 显示:

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件。数据文件是实际业务生产的消息的存储文件。默认数据文件是1G大小,超过1G后会roll out出一个新的数据文件。

三、消息模型

通常来讲,消息模型可以分为两种, 队列和发布-订阅式。

队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。

在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。

四、消费者组与消费者

  • 我们平时启动一个消费者实例的时候,其实就是一个消费者;
  • 这个实例代码中,必须要指定消费者组;
  • 如果这块代码启动多个实例,相当于在一个消费者组中启动了多个消费者;
  • 同时代码中可以指定消费哪些topic,可以是多个topic;
  • kafka中partition和消费者对应关系

五、kafka高可用机制

背景

  kafka 0.8以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。

  kafka 0.8以后,提供了HA机制,就是replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。只能读写leader?很简单,要是你可以随意读写每个follower,那么就要care数据一致性的问题,系统复杂度太高,很容易出问题(Tidb是提供副本读机制的,缺点是需要考虑主副本一致性,优点是提高了读性能)。kafka会均匀的将一个partition的所有replica分布在不同的机器上,这样才可以提高容错性。

  kafka的这种机制,就有所谓的高可用性了,因为如果某个broker宕机了,也没事儿,因为那个broker上面的partition在其他机器上都有副本的,那么此时会重新选举一个新的leader出来,大家继续读写那个新的leader即可。这就有所谓的高可用性了。

数据备份详解

  • 数据分布式存储

对于每一个Topic,我们都可以设置他包含几个Partition,每个Partition负责存储这个Topic一部分的数据。 然后Kafka的Broker集群中,每台机器上都存储了一些Partition,也就存放了Topic的一部分数据,这样就实现了Topic的数据分布式存储在一个Broker集群上。 但是有一个问题,万一 一个Kafka Broker宕机了,此时上面存储的数据不就丢失了吗?

没错,这就是一个比较大的问题了,分布式系统的数据丢失问题,是他首先必须要解决的,一旦说任何一台机器宕机,此时就会导致数据的丢失。

  • 多副本冗余的高可用机制

所以如果大家去分析任何一个分布式系统的原理,比如说zookeeper、kafka、redis cluster、elasticsearch、hdfs,等等,其实他都有自己内部的一套多副本冗余的机制,多副本冗余几乎是现在任何一个优秀的分布式系统都一般要具备的功能

在kafka集群中,每个Partition都有多个副本,其中一个副本叫做leader(说明leader也是副本),其他的副本叫做follower,如下图。

如上图所示,假设一个Topic拆分为了3个Partition,分别是Partition0,Partiton1,Partition2,此时每个Partition都有2个副本。

比如Partition0有一个副本是Leader,另外一个副本是Follower,Leader和Follower两个副本是分布在不同机器上的。

这样的多副本冗余机制,可以保证任何一台机器挂掉,都不会导致数据彻底丢失,因为起码还是有副本在别的机器上的。

  • 多副本之间数据如何同步?

接着我们就来看看多个副本之间数据是如何同步的?其实任何一个Partition,只有Leader是对外提供读写服务的

也就是说,如果有一个客户端往一个Partition写入数据,此时一般就是写入这个Partition的Leader副本。

然后Leader副本接收到数据之后,Follower副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。

  • ISR到底指的是什么东西?

既然大家已经知道了Partiton的多副本同步数据的机制了,那么就可以来看看ISR是什么了。

ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。

大家可以想一下 ,如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?

所以这个时候,就意味着Follower已经跟Leader不再处于同步的关系了。但是只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。

所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。
 

六、持久化

  • 数据持久化

发现线性的访问磁盘(即:按顺序的访问磁盘),很多时候比随机的内存访问快得多,而且有利于持久化;
传统的使用内存做为磁盘的缓存
Kafka直接将数据写入到日志文件中,以追加的形式写入

  • 日志数据持久化特性

写操作:通过将数据追加到文件中实现
读操作:读的时候从文件中读就好了

  • 优势

读操作不会阻塞写操作和其他操作(因为读和写都是追加的形式,都是顺序的,不会乱,所以不会发生阻塞),数据大小不对性能产生影响;
没有容量限制(相对于内存来说)的硬盘空间建立消息系统;
线性访问磁盘,速度快,可以保存任意一段时间!

  • 持久化的具体实现

  • 索引

为数据文件建索引:
稀疏存储,每隔一定字节的数据建立一条索引(这样的目的是为了减少索引文件的大小)。

下图为一个partition的索引示意图:

七、网络通信协议

作为一个消息队列,涉及的网络通信主要有两块:

  • 消息生产者与消息队列服务器之间(Kafka 中是生产者向队列「推」消息)
  • 消息消费者与消息队列服务器之间(Kafka 中是消费者向队列「拉」消息),但是消息量较小的时候,频繁拉取而又无数据的情况下,会造成不必要的性能浪费和带宽的浪费,因此kafka采用long poll的方式,即长轮询,意思是客户端拉取数据的时候,若服务端没有数据则阻塞,知道服务端有数据了再响应给客户端。但是长轮询也是有风险的,比如broker宕机了,客户端就一直等待了,所以最好设置个长轮询超时时间。

通信协议选择

要实现上述的网络通信,我们可以使用 HTTP 协议,比如服务端内嵌一个 jetty 容器,通过 servlet 来实现客户端与服务端之间的交互,但是其性能存在问题,无法满足高吞吐量这个需求。要实现高性能的网络通信,我们可以使用更底层的 TCP 或者 UDP 来实现自己的私有协议,而 UDP 协议是不可靠的传输协议,毕竟我们不希望一条消息在投递或者消费途中丢失了,所以 Kafka 选择 TCP 作为服务间通讯的协议

消息异步处理

Kafka 的生产者同时实现了同步和异步两种类型的客户端(即:向服务端发完请求后可以一直等待响应也可以继续干后面的事),其异步客户端实现方式是通过线程池加回调函数。Kafka 的服务端使用了 NIO 的 IO 多路复用技术,是非阻塞的 IO, kafka 的早期版本中,服务端是通过同步的方式处理客户端请求,最新版本是通过异步的方式进行的。Kafka 自带的消费者是通过同步阻塞的方式进行数据拉取的,当然如果需要异步处理,可以自己另外写一个异步消费者。

八、优秀设计理念

1、生产者的send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

2、 long poll 设计。客户端没有拉取到消息的时候就阻塞等待,知道有消息。

3、零拷贝技术应用:是指内核空间和用户空间的交互拷贝次数为0

常规IO操作:

  1. 操作系统将数据从磁盘读入到内核空间的也缓存
  2. 应用程序将数据从内核空间读入到用户空间(应用程序不能直接操作内核空间
  3. 应用程序将数据写回到内核空间的Socket缓存中
  4. 操作系统将数据从Socket缓冲区读入到网卡缓冲区,以便将数据经过网络发出

Kafka零拷贝技术应用后的流程:

  1. 操作系统将数据从磁盘读入到内核空间的也缓存
  2. 将数据的位置和长度的信息的描述符增加至Socket缓冲区
  3. 操作系统将数据从Socket缓冲区读入到网卡缓冲区,以便将数据经过网络发出

实现方式:

Java本身提供的channel.transferTo()方法

底层是调用Linux的sendfile

sendfile 系统调用在 Linux 内核版本 2.1 中被引入,目的是简化通过网络在两个通道之间进行的数据传输过程。 通过 sendfile 系统调用,数据可以直接在内核空间内部进行 I/O 传输,从而省去了数据在用户空间和内核空间之间的来回拷贝。

九、Ack机制

acks参数,是在KafkaProducer,也就是生产者客户端里设置的

也就是说,你往kafka写数据的时候,就可以来设置这个acks参数。然后这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

  • 第一种选择是把acks参数设置为0,意思就是我的KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了。
  • 第二种选择是设置 acks = 1,意思就是说只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。

    这种设置其实是kafka默认的设置,大家请注意,划重点!这是默认的设置

    也就是说,默认情况下,你要是不管acks这个参数,只要Partition Leader写成功就算成功。

  • 最后一种情况,就是设置acks=all,这个意思就是说,Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。

思考

acks=all 就可以代表数据一定不会丢失了吗?

当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。

插曲:spring for kafka的AckMode,ack模式有多种,分别如下:

  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME
    每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
  • COUNT
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL(这种模式需要在代码证手动确认)
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

  • 接下来问题来了, 如果代码中没有进行ack.acknowledge(),会怎么办呢??

消费者在消费消息的过程中,配置参数设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?

1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。

3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。

参考链接:

kafka生产者客户端

Kafka的ack参数详解

Kafka-数据持久化

关于OS Page Cache的简单介绍

聊聊spring for kafka的AckMode

Kafka中文文档

Kafka 设计详解之网络通信

Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/78620
推荐阅读
相关标签
  

闽ICP备14008679号