赞
踩
一个经典的Kafka体系架构包括若干Producer、若干Broker、若干Consumer、以及一个Zookeeper集群。Zookeeper是Kafka用来负责群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker、Broker负责将受到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
整体架构图
AR、ISR、OSR
分区中的所有副本统称为AR
(Assigned Replicas)。
所有与副本保持一定同步程度的副本组成ISR
(In-Sync Replicas)。
与leader副本同步滞后过多的副本组成OSR
(Out-of-Sync Replicas)。
AR = ISR +OSR。正常情况 应该AR=ISR,OSR集合为空。
HW、LEO
HW
(High Watermark)俗称高水位,表示了一个特定消息的偏移量,消费者只能拉取到这个offset之前的消息。
LEO
(Log End Offset)表示当前日志文件中下一条待写入消息的offset。
分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW
,对消费者而言只能消费HW之前的消息。
ProducerRecord
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
...
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
producer.send(record);
producer.send(record).get();
producer.send(record, new Callback(){
```
});
生产者架构图
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器
、序列化器
、和分区器
的一系列作用之后才能被真正地发往broker。
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。消费者需要用反序列化区把从Kafka中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。
分区器
如果消息中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。也可以自定义分区器。
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现ProducerInterceptor接口来使用。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程
和Sender线程(发送线程)
。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator)
中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送
,进而减少网络传输的资源消耗以提升性能。RecordAccumulator的缓存大小可以通过buffer.memory
配置。
主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Dqueue)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch
,即Dqueue< ProducerBatch >。一个ProducerBatch 包含多个ProducerRecord。
消息在网络上都是以字节的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。为避免频繁创建和释放内存,在RecordAccumulator内部有一个BufferPool来实现ByteBuffer的复用,不过只针对特定大小的ByteBuffer进行管理,这个大小由batch.size
参数来指定。
当一条消息(ProducerRecord)流入RecordAccumulator,如果这条消息小于batch.size参数大小则以batch.size参数大小创建ProducerBatch(可以通过ByteBuffer复用),否则以消息的实际大小创建ProducerBatch(不会通过ByteBuffer复用)。
后续Sender从缓存中获取消息,进行转换,发送到broker。在发送前还会保存到InFlightRequests
中,作用是缓存已经发送出去但还没有收到响应的请求,缓存数量由max.in.flight.requests.per.connection
参数确定,默认是5,表示每个连接最多缓存5个未响应的请求。
消费者(Consumer)负责订阅Kafka中的主体,在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组,消息只会投递给消费组中的一个消费者。
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过assign()方法则没有。
消费者要使用生产者序列化器对应的反序列化器。
Kafka中的消息是基于拉模式
的。Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。
public ConsumerRecords<K, V> (final Duration timeout)
timeout用于控制poll()方法的阻塞时间,没有消息时会阻塞。
Kafka中的每条消息都有唯一的offset,用来标识消息在分区中对应的位置。
Kafka默认的消费唯一的提交方式是自动提交,由enable.auto.commit
配置,默认为true。自动提交不是每一条消息提交一次,而是定期提交,周期由auto.commit.interval.ms
配置,默认为5秒。
自动提交可能发生消息重复或者丢失的情况,Kafka还提供了手动提交的方式。enable.auto.commit
配置为false开启手动提交。
auto.offset.reset
在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset
的配置来决定从何处开始进行消费。默认值为lastest,表示从分区末尾开始消费消息;earliest表示从起始开始消费;none为不进行消费,而是抛出异常。
seek()
seek()可以从特定的位移处开始拉去消息,得以追前消费或回溯消费。
public void seek(TopicPartition partition, long offset)
再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。
不过在再均衡发生期间,消费组内的消费者是无法读取消息的。再均衡后也可能出现重复消费的情况。所以应尽量避免不必要的再均衡发生。
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。
KafkaProducer是线程安全的,然而KafkaConsumer确实非线程安全的。KafkaConsumer中定义了一个acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程在操作则会抛出异常。KafkaConsumer中的每个公用方法在执行前都会调用acquire()方法,除了wakeup()。
acquire()不会造成阻塞等待,我们可以将其看做一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示响应的加锁和解锁操作。
多线程实现方式:
滑动窗口
的方案优化。如果broker端配置参数auto.create.topics.enable
设置为true(默认为true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions
(默认为1)、副本因子为default.replication.factor
(默认值为1)的主题。
分区和分区副本都对应一个日志文件,不是分区数越多吞吐量就越大,超过阈值会使Kafka报错或系统崩溃。
分区只能增加不能减少。
一个分区对应一个日志文件(Log)
,为了防止Log过大,Kafka又引入了日志分段(LogSegment)
的概念,将Log切分为多个LogSegment,便于消息的维护和清理。
Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(如以“.txnindex”为后缀的事物索引文件)。
v0、v1
v2
一条消息通常不会太大,Kafka是批量消息压缩,通过compression.type
配置默认为producer,还可以配置为gzip、snappy、lz4,uncompressed表示不压缩。
Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(log.index.interval.bytes
指定,默认4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。
Kafka提供两种日志清理策略:
顺序写磁盘
存储介质速度:磁盘、磁带、主存、缓存、寄存器
写入速度:随机写磁盘、随机写内存、顺序写磁盘、顺序写内存
所以Kafka的顺序写磁盘的速度不一定比写内存慢
。
页缓存
页缓存是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,减少对磁盘IO的操作。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页是否在页缓存中,如果存在则直接返回数据,如果没有则向磁盘中读取并存入页缓存;写入磁盘也是先写入页缓存,被修改过的页变成了脏页,操作系统会在合适的时间将数据写入磁盘。
Kafka也提供了同步刷盘以及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.message
、log.flush.interval.ms
等参数控制。
同步刷盘可以提高消息的可靠性,防止机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失,不过不建议这么做,刷盘任务应该交由操作系统去调配,消息可靠性应该由多副本机制来保障
。
零拷贝
所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。对于Linux操作系统而言,底层依赖于sendfile()方法实现。
一般的数据流程:磁盘 -> 内核 -> 应用 -> Socket -> 网卡,数据复制4次,上下文切换4次。
零拷贝技术通过DMA技术直接将文件内容复制到内核模式下的Read Buffer中。不过没有数据被复制到Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符(fd)
被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备。这里的数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝
。
DMA 传输将数据从一个地址空间复制到另外一个地址空间。当CPU 初始化这个传输动作,传输动作本身是由 DMA 控制器来实行和完成。因此通过DMA,硬件则可以绕过CPU,自己去直接访问系统主内存。很多硬件都支持DMA,其中就包括网卡、声卡、磁盘驱动控制器等。
(服务端、客户端、可靠性、高级应用 略)
#################producer的配置参数(开始)################# #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下: #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 #可以设置的值为:all, -1, 0, 1 spring.kafka.producer.acks=1 #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求, #这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384 spring.kafka.producer.batch-size=16384 #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 spring.kafka.producer.bootstrap-servers #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432 spring.kafka.producer.buffer-memory=33554432 #ID在发出请求时传递给服务器,用于服务器端日志记录 spring.kafka.producer.client-id #生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4'), #它还接受'uncompressed'以及'producer',分别表示没有压缩以及保留生产者设置的原始压缩编解码器, #默认值为producer spring.kafka.producer.compression-type=producer #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #值的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #如果该值大于零时,表示启用重试失败的发送次数 spring.kafka.producer.retries #################producer的配置参数(结束)################# #################consumer的配置参数(开始)################# #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 spring.kafka.consumer.auto-commit-interval; #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量 #可选的值为latest, earliest, none spring.kafka.consumer.auto-offset-reset=latest; #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接。 spring.kafka.consumer.bootstrap-servers; #ID在发出请求时传递给服务器;用于服务器端日志记录。 spring.kafka.consumer.client-id; #如果为true,则消费者的偏移量将在后台定期提交,默认值为true spring.kafka.consumer.enable-auto-commit=true; #如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位) #默认值为500 spring.kafka.consumer.fetch-max-wait; #服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。 spring.kafka.consumer.fetch-min-size; #用于标识此使用者所属的使用者组的唯一字符串。 spring.kafka.consumer.group-id; #心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000 spring.kafka.consumer.heartbeat-interval; #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #一次调用poll()操作时返回的最大记录数,默认值为500 spring.kafka.consumer.max-poll-records; #################consumer的配置参数(结束)################# #################listener的配置参数(结束)################# #侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效 spring.kafka.listener.ack-mode; #在侦听器容器中运行的线程数 spring.kafka.listener.concurrency; #轮询消费者时使用的超时(以毫秒为单位) spring.kafka.listener.poll-timeout; #当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数 spring.kafka.listener.ack-count; #当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位) spring.kafka.listener.ack-time; #################listener的配置参数(结束)#################
用途
系统解耦、流量削峰、消息持久化、消息顺序性保障
kafka为什么快/吞吐量大
如何保证可靠性/不丢失
send()
,可以在回调方法中处理发送失败的问题。比如重试或者写进数据库等。acsk=-1
,可以将数据同步到所有副本后才返回成功。修改min.insync.replicas
参数配合ack使用,指定了ISR中最小同步副本数,避免ISR中的副本宕机导致同步副本数过小。(这样也会影响顺序性和吞吐量)retries
的次数可以提高,提交生产者的缓冲区内存等。enable.auto.commit=false
改为手动提交。如何保证不重复消费/幂等
全局id去重
如何保证顺序性、及扩容
Kafka的分区顺序保证的,如果是全局顺序应使用一个分区。
如果不是全局顺序,且数据量大可以使用hash将数据发送到不同的分区处理。扩容可以通过增加分区数。
消息堆积
RabbitMQ、Kafka、RocketMQ对比
RabbitMQ的数据可靠性和集群可用性高,也支持一些高级应用,比如死信队列和重试队列。适合对数据一致性、稳定性和可靠性要求很高的场景。
Kafka最初用于日志收集,追求高吞吐,但是对消息可靠性要求不高,需要开发者做一些处理提高可靠性,也没有自带的高级应用,需要开发者自己实现。
RocketMQ参考Kafka的进行开发的,吞吐性和可靠性都比较高,适合大规模分布式系统应用,也支持高级应用,如事务消息等,但是社区活跃度不如上面两种。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。