当前位置:   article > 正文

kafka详解

kafka详解

一、摘要

kafka的基本组成部分及其角色扮演;

kafka通过ISR和ack机制保证数据一致性与幂等性;

kafka控制器选举规则;

partition与consumer数据关系:consumer不能比partition多

Consumer消费Partition的分配策略:Range、RoundRobin

二、kafka介绍

2.1、组件介绍

kafka最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。

涉及的相关概念如下:

名称解释
Broker消息中间件处理节点,一个kafka节点就是一个broker,一个或多个Broker可以组成一个kafka集群。
Topickafka根据Topic对消息进行分类,发布到kafka集群的每条消息都需要指定一个Topic
Producer消息生产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup每个consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer消费,但是一个Consumer Group中只能有一个consumer能消费该消息
Partition物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

2.2、partition特性

一个topic可以分多个partition,同topic下的不同partition包含的消息是不同的。每个消息分配到分区时都会分配一个offset,它是消息在此分区中的唯一编号,kafka通过offset保证在一个分区中的消息是有序的。

kafka通过ISR机制保持数据一致性。

1、kafka是多replica的,在kafka cluster中每个topic下的每个partition都有一个leader和多个follower;

2、follower跟kafka的读写性能无关,只是作为副本提供数据的可恢复性,只有leader与读写性能有关;

3、follower主动从leader中拉去数据。

2.3、副本与leader数据同步

如何确定副本与leader数据同步的呢?

ISR全称是In-Sync Replicas,IRS是一个副本列表,里面存储的都是能和leader数据一致的副本,判断一个副本是否在ISR中有两个判断条件,并通过一个周期调度来增加或删除ISR中的副本:

1、根据副本与leader的交互时间差,如果副本大于这个时间差没有和leader交互,就将该副本从ISR列表中剔除,配置参数为:replica.lag.time.max.ms=10000,单位是ms;

2、根据副本与leader的数据条数差决定是否将该follower从ISR中剔除,配置参数为:replica.lag.max.messages=4000,单位是条。

通过ISR和ack机制保证数据一致性以及保证幂等性(消息是否重复消费、发送等)。

配置参数:min.insync.replica=n,只有当n个副本确认了消息,才认为消息成功发送(n一般大于1,因为leader也在ISR列表中)。

min.insync.replica需配合request.required.acks=-1才能达到最大的可靠性。

request.required.acks参数说明:

0:生产者只管发送消息,不管服务器、消费者是否消费消息;

1:只有当leader确认收到消息,才确认此消息发送成功;

-1:只有当n-1个副本(leader也在isr中)都同步了消息,才确认消息成功发送。

2.4、触发分区重新选举情况

kafka集群有多个broker,其中有个扮演者Controller控制器的角色,它负责管理整个集群所有分区和副本的状态。

  1. 当某个分区的leader副本出现故障时,由控制器为该分区的副本选举新的leader;
  2. 当检测到某个分区的ISR集合发生变化时,由控制器通知所有Broker更新元数据;
  3. 当使用kafka-topics.sh为某个topic增加分区时,由控制器负责让新的分区被其他节点感知到。

2.5、保证数据不丢失的设置

Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader中pull数据。
ISR: in-sync-replica同步复制因子

生产者丢数据  producer.properties
#所有follwer同步完后,才认为是成功的
request.required.acks=-1/ALL
#允许消息重发的次数,默认是3
message.send.max.retries=3

消息队列丢数据 server.properties
#每个partition至少有多少个副本,一般>=3,参考了Hodoop的三备份原则
replication.factor=3
#只有当N个副本确定了消息,才算成功,与acks配合使用,replication.factor>min.insync.replicas

#如果leader挂掉,且这两个值相等,就会出问题
min.insync.replicas=2
#非ISR列表中的partition选举为leader
unclean.leader.election.enable=false

消费者丢数据 consumer.properties
#设置为手动提交,offset为每个partition中消息的下表,自动消费情况下默认从已消费的下表值+1开始消费
enable.auto.commit=false

#ISR剔除条件,如果超时或者消息数量相差值超过,则从ISR列表中删除
replica.lag.time.max.ms=10000,单位是ms
replica.lag.max.messages=4000,单位是条

三、consumer不能比partition多

partition与consumer数据关系:consumer不能比partition多

一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。

一个partition中的消息只能被一个consumer group中消费者消费,但一个消费者可以消费多个partition中的消息。

四、Consumer消费Partition的分配策略

Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。

4.1、触发分区分配

  1. 同一个consumer group内新增消费者;
  2. 消费者离开当前所属的consumer group,包括shut down、crashes;
  3. 订阅的主题新增分区。

4.2、range strategy

将topic中分区按序号排序,将消费者按字母顺序排序,然后将分区数除以消费者数量,如果除不尽,则排在前面的消费者线程多消费一个分区。

比如分区数为5个,消费者两个C1,C2,分区和消费者排序后0,1,2,3,4,消费者C1一个线程C1-0,C2两个线程C2-0,C2-1,则分配如下:

C1-0:0, 1

C2-0: 2, 3

C2-1: 4

如果有两个topic(T1,T2),分别有5个分区,那么分配结果是

C1-0:消费T1主题的0, 1分区和T2的0, 1分区;

C2-0:消费T1主题的2, 3分区和T2的2, 3分区;

C2-1:消费T1主题的4分区和T2的4分区;

4.3、roundrobin strategy

使用round robin策略有两个前提条件:

1、同一个Consumer Group里面的所有消费者num.streams必须相等;

2、每个消费者订阅的主题必须相同

round robin策略的工作原理:将所有主题的分区组成为TopicAndPartition列表,然后对TopicAndPartition进行排序。

  1. val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
  2. partitions.map(partition => {
  3. TopicAndPartition(topic, partition)
  4. })
  5. }.toSeq.sortWith((topicPartition1, topicPartition2) => {
  6. topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
  7. })

然后按round robin风格将分区分配给不同的消费者线程。以上一个列子,每个消费者两个线程为例:

按照hashcode排序完的topic-partitions组依次是T1-4,T1-0,T1-3,T1-1,T1-2,消费者线程为C1-0,C1-1,C2-0,C2-1,最后分配结果为:

C1-0:T1-4,T1-2

C1-1:T1-0

C2-0:T1-3

C2-1:T1-1

五、kafka发送消息幂等性

kafka可通过ack确认及retries重试机制可保证消息发送的成功,但如果出现网络抖动、超时等情况,broker已收到消息但producer未收到确认,producer触发重试从而导致消息的重复。

5.1、单一partition消息幂等

kafka通过 幂等性(Idempotence)和事务(Transaction) 的机制。在0.11.0.0之前,kafka支持两种语义:At most once和At least once,之后提供了幂等的支持。

Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。

对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。

相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

开启幂等的配置项:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

5.2、多分区幂等性

5.2.1、使用KafkaProducer init、begin、commit/abort事务管理

数据的发送需要放在beginTransaction和commitTransaction之间。Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。示例代码:

  1. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigs(properties));
  2. producer.initTransactions();
  3. try {
  4. producer.beginTransaction();
  5. producer.send(record1);
  6. producer.send(record2);
  7. producer.commitTransaction();
  8. } catch (KafkaException e) {
  9. producer.abortTransaction();
  10. }

5.2.2、配置事务管理器+注解的方式

以KafkaTemplate.send()示例,在producerFactory中开启事务功能,KafkaTransactionManager是我们配置的事务管理器。并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。

  1. @Bean
  2. public ProducerFactory<Integer, String> producerFactory() {
  3. DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());
  4. factory.transactionCapable();
  5. factory.setTransactionIdPrefix("tran-");
  6. return factory;
  7. }
  8. @Bean
  9. public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
  10. KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
  11. return manager;
  12. }

开启事务

  1. @Test
  2. @Transactional
  3. public void testTransactionalAnnotation() throws InterruptedException {
  4. kafkaTemplate.send("topic.quick.tran", "test transactional annotation");
  5. throw new RuntimeException("fail");
  6. }

5.2.3、使用KafkaTemplate.executeInTransaction开启事务

  1. @Test
  2. public void testExecuteInTransaction() throws InterruptedException {
  3. kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
  4. @Override
  5. public Object doInOperations(KafkaOperations kafkaOperations) {
  6. kafkaOperations.send("topic.quick.tran", "test executeInTransaction");
  7. throw new RuntimeException("fail");
  8. //return true;
  9. }
  10. });
  11. }

六、kafka在zookeeper节点中的体现

kafka注册到zk的组件

1、Broker

连接zk创建临时节点,目录:/broker/ids。待zk创建此节点后,kafka会把这个broker的主机名和端口号记录到此节点;

2、Topic

当我们创建主题时,kafka先到/broker/ids获取当前存活的broker列表执行副本的分配方案,将方案写入/brokers/topic/<topic>节点下。更新ISR列表,目录为:/broker/topics/{topic}/partitions/{partition}/state

分配方案格式:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}}

SR列表格式:{"controller_epoch":18,"leader":-1,"version":1,"leader_epoch":86,"isr":[]}"1":[1,0,2],"2":[2,0,1]}}

Consumer

当有新的消费者时,会注册到/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者会将自己订阅的topic信息写入该临时节点。

七、kafka如何实现高效和高吞吐量

页面缓存技术+磁盘顺序写+零拷贝技术

页面缓存技术:页缓存是linux内核中的一种重要的高速磁盘缓存,是计算机随机存取器RAM中的一块区域,主要负责用户空间与磁盘文件之间的高效读写。

磁盘顺序写:kafka是以追加的方式写到磁盘,保证磁盘数据连续紧凑,同时kafka以segment log file进行分段存储,每次访问磁盘文件时,只需寻址最后一个segment file的磁盘空间,所以能保证写入与读取效率的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/502242
推荐阅读
相关标签
  

闽ICP备14008679号