赞
踩
1.2.2 消息队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
注意:
1、Topic作用:对消息分类,生产者发往对应topic,消费者根据topic取数据。
2、Partition作用:对kafka来说,实现topic存储的负载均衡;提高读写并发度(消费并发度)。
3、消费者组作用:提高读写并发,提高消费能力(原来仅仅ConsumeA消费Topic的分区P0和P1,现在消费者组内ConsumeA消费Topic的P0,ConsumerB消费Topic的P1)
同一个消费者组里的不同消费者不能同时消费同一个消息。可以把整个消费组当成一个整体。
4、消费者组内的消费者个数不能超过分区数(超过的消费者组内的消费者被创建了在运行,但不执行任何任务)
消费速度最高是:消费者组内的消费者个数等于分区数
5、zk作用:1)帮kafka存储一些信息。2)当消费者A已经消费5条挂了,希望下次从第6次开始消费(offset消费的信息保存着zk中)。
6、副本作用:备份/容灾
注意:
1)、每个partition都维护一个offset。
2)、kafka只能保证区内有序,不能保证全局有序。
3)、Partition是物理分区。每个partition对应一个log文件。
1)副本数据同步策略
方案 | 优点 | 缺点 |
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要 2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
2)ISR
ISR功能:同步副本/leader挂后选择新的leader
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。
当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
为了保证少的丢失数据,在重新选举leader时需要看两个因素:1)当leader刚获取数据,此时follower1同步8条,follower2同步10条,当leader挂了以后,选择follower2作为新的leader;2)数据同步时间短。
条件一:假设follower比leader的消息条数少10条时,会将该follower剔除isr,当满足时间的阈值后又会从新加入到isr中,所以不断剔除/加入。所以当一次batch写入的数据大于10条,如12条时,isr会将所有的follower剔除isr。
实际上batch是频繁的操作,这就导致频繁的剔除isr/加入isr中,频繁内存操作,并且数据存在与zk中,导致频繁操作zk。
3)ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
4、ack=-1会导致数据重复
当producer成功发送数据给leader,此时follower也全部成功同步数据完成。此时leader挂了没返回ack给producer。则需要从ISR中选择新的leader,此时producer会从新发送数据到新的leader中,导致数据重复。
1、问题:当leader含有消息10条,follower1含有消息9条,follower2含有消息8条。当leader挂了此时选择follower2作为新的leader,则消费者只能消费9条数据,而此时leader又复活了,消费者又可以消费10条数据,导致数据不一致?
HW之前的数据才能对Consumer可见。保证了消费者消费的一致性。
ACK解决数据不丢失和不重复问题。
HW保证副本数据一致性问题。
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是RoundRobin,一是Range。
使用RoundRobin前提:一个消费者组订阅的消息是同一个topic。
Range: 是按照范围分配。
Range问题:随着topic增加,range造成负载不均衡。例如有两个topic(topic1有消息123,topic2有消息123),此时会将topic中12/topic2中12发给consumer1,topic中3/topic2中3发给consumer2.
1、顺序写:600M/s,开辟一块空间顺序写。
随机写:100K/s,需要寻址后写入。
2、零复制
当需要将项目某个a目录下a.txt文件copy一份到b目录下b.txt中,需要使用IO流
File先读到内核态(File Cache),在读到用户态(App代码),用户态再将文件写入到内核态(Socket Cache),在写到磁盘。
零拷贝不用经过用户态,仅仅通过内核态操作即可。
有拦截器的框架一定先走拦截器。
2.带回调函数的API
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
1)导入依赖
org.apache.kafka
kafka-clients
0.11.0.0
2)编写代码
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔
以下为自动提交offset的代码:
虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
1)同步提交offset
由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。
2)异步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
以下为异步提交offset的示例:
- public class CustomConsumer {
-
-
- public static void main(String[] args) {
-
-
- Properties props = new Properties();
-
- //Kafka集群
-
- props.put("bootstrap.servers", "hadoop102:9092");
-
- //消费者组,只要group.id相同,就属于同一个消费者组
- props.put("group.id", "test");
-
- //关闭自动提交offset
- props.put("enable.auto.commit", "false");
-
-
-
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
-
- KafkaConsumerconsumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
- while (true) {
- ConsumerRecords records = consumer.poll(100);//消费者拉取数据
- for (ConsumerRecord record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
-
- //异步提交
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map offsets, Exception exception) {
- if (exception != null) {
- System.err.println("Commit failed for" + offsets);
-
- }
- }
- });
- }
- }
-
- }
3) 数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
(1)增加时间戳拦截器
- public class TimeInterceptor implements ProducerInterceptor {
-
- @Override
- public void configure(Map configs) {
- }
-
- @Override
- public ProducerRecordonSend(ProducerRecord record) {
- // 创建一个新的record,把时间戳写入消息体的最前部
- return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
- System.currentTimeMillis() + "," + record.value().toString());
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- }
-
- @Override
- public void close() {
- }
- }
(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
- public class CounterInterceptor implements ProducerInterceptor{
-
- private int errorCounter = 0;
- private int successCounter = 0;
-
- @Override
- public void configure(Map configs) {
- }
-
- @Override
- public ProducerRecord onSend(ProducerRecord record) {
- return record;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- // 统计成功和失败的次数
- if (exception == null) {
- successCounter++;
- } else {
- errorCounter++;
- }
- }
-
- @Override
- public void close() {
- // 保存结果
- System.out.println("Successful sent: " + successCounter);
- System.out.println("Failed sent: " + errorCounter);
- }
- }
(3)producer主程序
- public class InterceptorProducer {
-
- public static void main(String[] args) throws Exception {
-
- // 1 设置配置信息
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop102:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2 构建拦截链
- List interceptors = new ArrayList<>();
- interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor");
-
- props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- String topic = "first";
- Producer producer = new KafkaProducer<>(props);
-
- // 3 发送消息
- for (int i = 0; i < 10; i++) {
- ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
- producer.send(record);
- }
-
- // 4 一定要关闭producer,这样才会调用interceptor的close方法
- producer.close();
- }
- }
-
3)测试
(1)在kafka上启动消费者,然后运行客户端java程序。
- 1501904047034,message0
- 1501904047225,message1
- 1501904047230,message2
- 1501904047234,message3
- 1501904047236,message4
- 1501904047240,message5
- 1501904047243,message6
- 1501904047246,message7
- 1501904047249,message8
- 1501904047252,message9
1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)又代表什么?
2.Kafka中的HW、LEO等分别代表什么?
3.Kafka中是怎么体现消息顺序性的?
4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
5.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
6.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
8.有哪些情形会造成重复消费?
9.那些情景会造成消息漏消费?
10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache
11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
12.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
13.Kafka有内部的topic吗?如果有是什么?有什么所用?
14.Kafka分区分配的概念?
15.简述Kafka的日志目录结构?
16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
17.聊一聊Kafka Controller的作用?
18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
19.失效副本是指什么?有那些应对措施?
20.Kafka的哪些设计让它有如此高的性能?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。