当前位置:   article > 正文

Kafka(十):Kafka概述_kafka 拦截器 对不满足条件的数据放入队列进行等待

kafka 拦截器 对不满足条件的数据放入队列进行等待

第1章 Kafka概述

尚硅谷大数据技术之Kafka
 

1.2.2 消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。尚硅谷大数据技术之Kafka

注意:

1、Topic作用:对消息分类,生产者发往对应topic,消费者根据topic取数据。

2、Partition作用:对kafka来说,实现topic存储的负载均衡;提高读写并发度(消费并发度)。

3、消费者组作用:提高读写并发,提高消费能力(原来仅仅ConsumeA消费Topic的分区P0和P1,现在消费者组内ConsumeA消费Topic的P0,ConsumerB消费Topic的P1)

   同一个消费者组里的不同消费者不能同时消费同一个消息。可以把整个消费组当成一个整体。

4、消费者组内的消费者个数不能超过分区数(超过的消费者组内的消费者被创建了在运行,但不执行任何任务)

消费速度最高是:消费者组内的消费者个数等于分区数

5、zk作用:1)帮kafka存储一些信息。2)当消费者A已经消费5条挂了,希望下次从第6次开始消费(offset消费的信息保存着zk中)。

6、副本作用:备份/容灾


尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

第2章 Kafka架构深入

2.1 Kafka工作流程及文件存储机制

注意:

1)、每个partition都维护一个offset。

2)、kafka只能保证区内有序,不能保证全局有序。

3)、Partition是物理分区。每个partition对应一个log文件。

尚硅谷大数据技术之Kafka
尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

1)副本数据同步策略

方案

优点

缺点

半数以上完成同步,就发送ack

延迟低

选举新的leader时,容忍n台节点的故障,需要 2n+1个副本

全部完成同步,才发送ack

选举新的leader时,容忍n台节点的故障,需要n+1个副本

延迟高

Kafka选择了第二种方案,原因如下:

1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。

2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

2)ISR

ISR功能:同步副本/leader挂后选择新的leader

       采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?

       Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。

当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

为了保证少的丢失数据,在重新选举leader时需要看两个因素:1)当leader刚获取数据,此时follower1同步8条,follower2同步10条,当leader挂了以后,选择follower2作为新的leader;2)数据同步时间短。

条件一:假设follower比leader的消息条数少10条时,会将该follower剔除isr,当满足时间的阈值后又会从新加入到isr中,所以不断剔除/加入。所以当一次batch写入的数据大于10条,如12条时,isr会将所有的follower剔除isr。

实际上batch是频繁的操作,这就导致频繁的剔除isr/加入isr中,频繁内存操作,并且数据存在与zk中,导致频繁操作zk。

3)ack应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。

所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

4、ack=-1会导致数据重复

当producer成功发送数据给leader,此时follower也全部成功同步数据完成。此时leader挂了没返回ack给producer。则需要从ISR中选择新的leader,此时producer会从新发送数据到新的leader中,导致数据重复。

2.2 数据一致性问题

1、问题:当leader含有消息10条,follower1含有消息9条,follower2含有消息8条。当leader挂了此时选择follower2作为新的leader,则消费者只能消费9条数据,而此时leader又复活了,消费者又可以消费10条数据,导致数据不一致?

HW之前的数据才能对Consumer可见。保证了消费者消费的一致性。

ACK解决数据不丢失和不重复问题。

HW保证副本数据一致性问题。

2.3、Exactly Once

第3章 Kafka消费者

3.3.1 消费方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

3.3.2 分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有两种分配策略,一是RoundRobin,一是Range。

尚硅谷大数据技术之Kafka

使用RoundRobin前提:一个消费者组订阅的消息是同一个topic。

Range: 是按照范围分配。

Range问题:随着topic增加,range造成负载不均衡。例如有两个topic(topic1有消息123,topic2有消息123),此时会将topic中12/topic2中12发给consumer1,topic中3/topic2中3发给consumer2.



尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

3.3.3 Kafka高效读写数据

1、顺序写:600M/s,开辟一块空间顺序写。

    随机写:100K/s,需要寻址后写入。

2、零复制

当需要将项目某个a目录下a.txt文件copy一份到b目录下b.txt中,需要使用IO流

File先读到内核态(File Cache),在读到用户态(App代码),用户态再将文件写入到内核态(Socket Cache),在写到磁盘。
尚硅谷大数据技术之Kafka

零拷贝不用经过用户态,仅仅通过内核态操作即可。



尚硅谷大数据技术之Kafka

第4章 Kafka API

4.1 Producer API

4.1.1 消息发送流程

有拦截器的框架一定先走拦截器。

尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

尚硅谷大数据技术之Kafka

2.带回调函数的API

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

尚硅谷大数据技术之Kafka
 

4.1.3 同步发送API

            同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。

由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

尚硅谷大数据技术之Kafka

4.2 Consumer API

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据是必须考虑的问题。

4.2.1 自动提交offset

1)导入依赖

org.apache.kafka

kafka-clients

0.11.0.0

2)编写代码

需要用到的类:

KafkaConsumer:需要创建一个消费者对象,用来消费数据

ConsumerConfig:获取所需的一系列配置参数

ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

自动提交offset的相关参数:

enable.auto.commit是否开启自动提交offset功能

auto.commit.interval.ms自动提交offset的时间间隔

以下为自动提交offset的代码:

尚硅谷大数据技术之Kafka

4.2.2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

1)同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

尚硅谷大数据技术之Kafka

2)异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

以下为异步提交offset的示例:

  1. public class CustomConsumer {
  2.     public static void main(String[] args) {
  3.         Properties props = new Properties();
  4.         //Kafka集群
  5.         props.put("bootstrap.servers", "hadoop102:9092");
  6.         //消费者组,只要group.id相同,就属于同一个消费者组
  7.         props.put("group.id", "test");
  8.         //关闭自动提交offset
  9.         props.put("enable.auto.commit", "false");
  10.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12.         KafkaConsumerconsumer = new KafkaConsumer<>(props);
  13.         consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
  14.         while (true) {
  15.             ConsumerRecords records = consumer.poll(100);//消费者拉取数据
  16.             for (ConsumerRecord record : records) {
  17.                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  18.             }
  19. //异步提交
  20.             consumer.commitAsync(new OffsetCommitCallback() {
  21.                 @Override
  22.                 public void onComplete(Map offsets, Exception exception) {
  23.                     if (exception != null) {
  24.                         System.err.println("Commit failed for" + offsets);
  25.                     }
  26.                 }
  27.             });
  28.         }
  29.     }
  30. }

3) 数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

4.3 自定义Interceptor

4.3.1 拦截器原理

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord):

该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。

(4)close:

关闭interceptor,主要用于执行一些资源清理工作

如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

4.3.2 拦截器案例

尚硅谷大数据技术之Kafka
2)案例实操

(1)增加时间戳拦截器

  1. public class TimeInterceptor implements ProducerInterceptor {
  2.    @Override
  3.    public void configure(Map configs) {
  4.    }
  5.    @Override
  6.    public ProducerRecordonSend(ProducerRecord record) {
  7.       // 创建一个新的record,把时间戳写入消息体的最前部
  8.      return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
  9.              System.currentTimeMillis() + "," + record.value().toString());
  10.    }
  11.    @Override
  12.    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  13.    }
  14.    @Override
  15.    public void close() {
  16.    }
  17. }

(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器

  1. public class CounterInterceptor implements ProducerInterceptor{
  2.     private int errorCounter = 0;
  3.     private int successCounter = 0;
  4.    @Override
  5.    public void configure(Map configs) {
  6.    }
  7.    @Override
  8.    public ProducerRecord onSend(ProducerRecord record) {
  9.        return record;
  10.    }
  11.    @Override
  12.    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  13.       // 统计成功和失败的次数
  14.         if (exception == null) {
  15.             successCounter++;
  16.         } else {
  17.             errorCounter++;
  18.         }
  19.    }
  20.    @Override
  21.    public void close() {
  22.         // 保存结果
  23.         System.out.println("Successful sent: " + successCounter);
  24.         System.out.println("Failed sent: " + errorCounter);
  25.    }
  26. }

(3)producer主程序

  1. public class InterceptorProducer {
  2.    public static void main(String[] args) throws Exception {
  3.       // 1 设置配置信息
  4.       Properties props = new Properties();
  5.       props.put("bootstrap.servers", "hadoop102:9092");
  6.       props.put("acks", "all");
  7.       props.put("retries", 0);
  8.       props.put("batch.size", 16384);
  9.       props.put("linger.ms", 1);
  10.       props.put("buffer.memory", 33554432);
  11.       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12.       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  13.       // 2 构建拦截链
  14.       List interceptors = new ArrayList<>();
  15.    interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor");
  16.       props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); 
  17.       String topic = "first";
  18.       Producer producer = new KafkaProducer<>(props);   
  19.       // 3 发送消息
  20.       for (int i = 0; i < 10; i++) {
  21.           ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
  22.           producer.send(record);
  23.      }
  24.        
  25.       // 4 一定要关闭producer,这样才会调用interceptor的close方法
  26.       producer.close();
  27.    }
  28. }

3)测试

(1)在kafka上启动消费者,然后运行客户端java程序。

  1. 1501904047034,message0
  2. 1501904047225,message1
  3. 1501904047230,message2
  4. 1501904047234,message3
  5. 1501904047236,message4
  6. 1501904047240,message5
  7. 1501904047243,message6
  8. 1501904047246,message7
  9. 1501904047249,message8
  10. 1501904047252,message9

第5章 Kafka面试题

5.1 面试问题

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)又代表什么?

2.Kafka中的HW、LEO等分别代表什么?

3.Kafka中是怎么体现消息顺序性的?

4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

5.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?

6.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?

7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

8.有哪些情形会造成重复消费?

9.那些情景会造成消息漏消费?

10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?

    1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first

    2)触发Controller的监听程序

    3)kafka Controller 负责topic的创建工作,并更新metadata cache

11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

12.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

13.Kafka有内部的topic吗?如果有是什么?有什么所用?

14.Kafka分区分配的概念?

15.简述Kafka的日志目录结构?

16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?

17.聊一聊Kafka Controller的作用?

18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

19.失效副本是指什么?有那些应对措施?

20.Kafka的哪些设计让它有如此高的性能?

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/471634
推荐阅读
相关标签
  

闽ICP备14008679号