赞
踩
kafka的基本组成部分及其角色扮演;
kafka通过ISR和ack机制保证数据一致性与幂等性;
kafka控制器选举规则;
partition与consumer数据关系:consumer不能比partition多
Consumer消费Partition的分配策略:Range、RoundRobin
kafka最初由Linkedin公司开发,是一个分布式、支持分区(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。
涉及的相关概念如下:
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,一个kafka节点就是一个broker,一个或多个Broker可以组成一个kafka集群。 |
Topic | kafka根据Topic对消息进行分类,发布到kafka集群的每条消息都需要指定一个Topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer消费,但是一个Consumer Group中只能有一个consumer能消费该消息 |
Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
一个topic可以分多个partition,同topic下的不同partition包含的消息是不同的。每个消息分配到分区时都会分配一个offset,它是消息在此分区中的唯一编号,kafka通过offset保证在一个分区中的消息是有序的。
kafka通过ISR机制保持数据一致性。
1、kafka是多replica的,在kafka cluster中每个topic下的每个partition都有一个leader和多个follower;
2、follower跟kafka的读写性能无关,只是作为副本提供数据的可恢复性,只有leader与读写性能有关;
3、follower主动从leader中拉去数据。
如何确定副本与leader数据同步的呢?
ISR全称是In-Sync Replicas,IRS是一个副本列表,里面存储的都是能和leader数据一致的副本,判断一个副本是否在ISR中有两个判断条件,并通过一个周期调度来增加或删除ISR中的副本:
1、根据副本与leader的交互时间差,如果副本大于这个时间差没有和leader交互,就将该副本从ISR列表中剔除,配置参数为:replica.lag.time.max.ms=10000,单位是ms;
2、根据副本与leader的数据条数差决定是否将该follower从ISR中剔除,配置参数为:replica.lag.max.messages=4000,单位是条。
通过ISR和ack机制保证数据一致性以及保证幂等性(消息是否重复消费、发送等)。
配置参数:min.insync.replica=n,只有当n个副本确认了消息,才认为消息成功发送(n一般大于1,因为leader也在ISR列表中)。
min.insync.replica需配合request.required.acks=-1才能达到最大的可靠性。
request.required.acks参数说明:
0:生产者只管发送消息,不管服务器、消费者是否消费消息;
1:只有当leader确认收到消息,才确认此消息发送成功;
-1:只有当n-1个副本(leader也在isr中)都同步了消息,才确认消息成功发送。
kafka集群有多个broker,其中有个扮演者Controller控制器的角色,它负责管理整个集群所有分区和副本的状态。
Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader中pull数据。
ISR: in-sync-replica同步复制因子
生产者丢数据 producer.properties
#所有follwer同步完后,才认为是成功的
request.required.acks=-1/ALL
#允许消息重发的次数,默认是3
message.send.max.retries=3
消息队列丢数据 server.properties
#每个partition至少有多少个副本,一般>=3,参考了Hodoop的三备份原则
replication.factor=3
#只有当N个副本确定了消息,才算成功,与acks配合使用,replication.factor>min.insync.replicas
#如果leader挂掉,且这两个值相等,就会出问题
min.insync.replicas=2
#非ISR列表中的partition选举为leader
unclean.leader.election.enable=false
消费者丢数据 consumer.properties
#设置为手动提交,offset为每个partition中消息的下表,自动消费情况下默认从已消费的下表值+1开始消费
enable.auto.commit=false
#ISR剔除条件,如果超时或者消息数量相差值超过,则从ISR列表中删除
replica.lag.time.max.ms=10000,单位是ms
replica.lag.max.messages=4000,单位是条
partition与consumer数据关系:consumer不能比partition多
一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。
一个partition中的消息只能被一个consumer group中消费者消费,但一个消费者可以消费多个partition中的消息。
Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。
将topic中分区按序号排序,将消费者按字母顺序排序,然后将分区数除以消费者数量,如果除不尽,则排在前面的消费者线程多消费一个分区。
比如分区数为5个,消费者两个C1,C2,分区和消费者排序后0,1,2,3,4,消费者C1一个线程C1-0,C2两个线程C2-0,C2-1,则分配如下:
C1-0:0, 1
C2-0: 2, 3
C2-1: 4
如果有两个topic(T1,T2),分别有5个分区,那么分配结果是
C1-0:消费T1主题的0, 1分区和T2的0, 1分区;
C2-0:消费T1主题的2, 3分区和T2的2, 3分区;
C2-1:消费T1主题的4分区和T2的4分区;
使用round robin策略有两个前提条件:
1、同一个Consumer Group里面的所有消费者num.streams必须相等;
2、每个消费者订阅的主题必须相同
round robin策略的工作原理:将所有主题的分区组成为TopicAndPartition列表,然后对TopicAndPartition进行排序。
- val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
- partitions.map(partition => {
- TopicAndPartition(topic, partition)
- })
- }.toSeq.sortWith((topicPartition1, topicPartition2) => {
- topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
- })
然后按round robin风格将分区分配给不同的消费者线程。以上一个列子,每个消费者两个线程为例:
按照hashcode排序完的topic-partitions组依次是T1-4,T1-0,T1-3,T1-1,T1-2,消费者线程为C1-0,C1-1,C2-0,C2-1,最后分配结果为:
C1-0:T1-4,T1-2
C1-1:T1-0
C2-0:T1-3
C2-1:T1-1
kafka可通过ack确认及retries重试机制可保证消息发送的成功,但如果出现网络抖动、超时等情况,broker已收到消息但producer未收到确认,producer触发重试从而导致消息的重复。
kafka通过 幂等性(Idempotence)和事务(Transaction) 的机制。在0.11.0.0之前,kafka支持两种语义:At most once和At least once,之后提供了幂等的支持。
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。
对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。
相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
开启幂等的配置项:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true
)
5.2、多分区幂等性
数据的发送需要放在beginTransaction和commitTransaction之间。Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。示例代码:
- KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigs(properties));
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- producer.commitTransaction();
- } catch (KafkaException e) {
- producer.abortTransaction();
- }
以KafkaTemplate.send()示例,在producerFactory中开启事务功能,KafkaTransactionManager是我们配置的事务管理器。并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。
- @Bean
- public ProducerFactory<Integer, String> producerFactory() {
- DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());
- factory.transactionCapable();
- factory.setTransactionIdPrefix("tran-");
- return factory;
- }
-
- @Bean
- public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
- KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
- return manager;
- }
开启事务
- @Test
- @Transactional
- public void testTransactionalAnnotation() throws InterruptedException {
- kafkaTemplate.send("topic.quick.tran", "test transactional annotation");
- throw new RuntimeException("fail");
- }
- @Test
- public void testExecuteInTransaction() throws InterruptedException {
- kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
- @Override
- public Object doInOperations(KafkaOperations kafkaOperations) {
- kafkaOperations.send("topic.quick.tran", "test executeInTransaction");
- throw new RuntimeException("fail");
- //return true;
- }
- });
- }
kafka注册到zk的组件
1、Broker
连接zk创建临时节点,目录:/broker/ids。待zk创建此节点后,kafka会把这个broker的主机名和端口号记录到此节点;
2、Topic
当我们创建主题时,kafka先到/broker/ids获取当前存活的broker列表执行副本的分配方案,将方案写入/brokers/topic/<topic>节点下。更新ISR列表,目录为:/broker/topics/{topic}/partitions/{partition}/state
分配方案格式:{"version":1,"partitions":{"0":[0,1,2],"1":[1,0,2],"2":[2,0,1]}}
SR列表格式:{"controller_epoch":18,"leader":-1,"version":1,"leader_epoch":86,"isr":[]}"1":[1,0,2],"2":[2,0,1]}}
Consumer
当有新的消费者时,会注册到/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者会将自己订阅的topic信息写入该临时节点。
页面缓存技术+磁盘顺序写+零拷贝技术
页面缓存技术:页缓存是linux内核中的一种重要的高速磁盘缓存,是计算机随机存取器RAM中的一块区域,主要负责用户空间与磁盘文件之间的高效读写。
磁盘顺序写:kafka是以追加的方式写到磁盘,保证磁盘数据连续紧凑,同时kafka以segment log file进行分段存储,每次访问磁盘文件时,只需寻址最后一个segment file的磁盘空间,所以能保证写入与读取效率的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。