赞
踩
kafka的两个重要特性造就了他的可伸缩性
命令:bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --create --topic test --partitions 2 --replication-factor 1
//运行结果
bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --create --topic test --partitions 2 --replication-factor 1
Created topic test.
命令:bin/kafka-topics.sh --zookeeper localhost:2181 --list
运行结果
bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --list
__consumer_offsets
czh
jm
mykafka
test
bash-5.1#
命令:bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --describe --topic mykafka
运行结果
bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --describe --topic mykafka
Topic: mykafka TopicId: yyJojBfxT1azmNJcUAMoaQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: mykafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
bash-5.1#
命令:bin/kafka-console-consumer.sh --bootstrap-server 192.168.72.249:9092 --topic mykafka
运行结果
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.72.249:9092 --topic mykafka
命令:bin/kafka-console-producer.sh --broker-list 192.168.72.249:9092 --topic mykafka
bash-5.1bin/kafka-console-producer.sh --broker-list 192.168.72.249:9092 --topic mykafka
本身kafka有自己的分区策略,如果未指定,就会使用默认的分区策略。
kafka根据传递消息的key来进行分区的分配,即hash(key)%numPartitions(分区的数量),如果key相同的话,那么就会分配到统一的分区
Producer拦截器是在0.10版本引入的,主要用于实现clinets端的定制化控制逻辑
使用场景:
消息发送的过程,涉及到两个线程的协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程之间的缓冲器)中暂存,Sender线程负责将消息构成请求,并最终执行网络i/o的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,kafkaProducer是线程安全的,多个线程之间可以共享使用同一个kafkaProducer对象。
这个参数是用来指定分区必须有多少个副本收到这条消息,之后生产者才会认为这条消息是写入成功的,ack是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡
生产者从服务器收到错误有可能是临时性的错误(比如分区找不到首领),在这种情况下,如果达到了reties设置的次数,生产者机会放弃重试并返回错误,默认情况下,生产者在每次重试之间的等待100ms,可以通过retry.backoff.ms参数来修改这个事件间隔。
当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次中,该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息的格式。当批次被填满后,批次里的所有消息会被发送出去,不过生产者并不一定会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次可能被发送,所以就算把batch.size设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁的发送消息而增加一些额外的开销。
该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指定单个请求里所有消息的总大小,broker对可接收的消息最大值也有自己的限制(message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被broker拒绝。
消费者和消费组:
消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息,假设有一个T1主题,该主题有4个分区,同时我们有一个消费组G1,这个消费组只有一个消费者C1,那么消费者会收到这4个分区的消息,如下所示:
kafka一个很重要的特性是,只需要写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。应用需要有不同的消费组,对于上面的例子,例如我们新增了一个全新的消费组G2,而这个消费组有两个消费者,那么会是这样的:(不同业务接收同一个消息例如下单后的短信和邮件通知功能)
创建完消费者后我们可以订阅主题,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic))
另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接kafka与其他系统时非常有用,比如订阅所有的测试主题:
consumer.subscribe(Pattern.compile("jm"))
指定订阅的分区
//指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic0701",0)))
//与kafkaProducer中设置保持一致
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDesrializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
对于kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。
当我们调用poll()时,该方法会返回我们没有消费的消息,当消息从broler返回消费者时,broker并不跟踪这些消息是否被消费者收到;kafka让消费者自身来管理消费的位移,并小消费者提供更新位移的接口,这种更新位移的方式称为提交。
自动提交
这种方式让消费者来管理位移,应用本身不需要显式操作,当我们将enable.auto.commint设置为true,那么消费者会在poll方法调用后每隔5秒,(由auto.commit.interval.ms指定)提交一次位移,和其他很多操作一样,自动提交也是由poll()方法来驱动的,在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移值。
需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后,kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
同步提交
异步提交
手动提交有一个缺点,就是当发起提交时应用会阻塞,当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和手动提交一样),另外一个解决方案是,使用异步提交的api
但异步提交也有个缺点,就是如果服务器返回提交失败,异步提交不会进行重试,相比较,同步提交会进行重试知道成功或者最后抛出异常给应用,异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖,举个例子,假如我们发起了一个异步提交commitA,此时的提交位移是2000,随后又发起了一个异步提交commitB的位移为3000,commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上已经提交的位移从3000回滚到2000,导致消息重复消费。
指定位移消费
到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来进行处理的,但是这个方法对于普通开发人员来说就是黑盒处理,无法精确掌握其消费的起始位置
seek()方法正好提供这个功能,让我们得以追踪之前的消费或者回溯消费。
指定从分区末尾开始消费,
Map<TopicPartition,Long> offsets = consumer.endoffsets(assignment);
for(TopicPartition tp :assignment){
consumer.seek(tp,offsets.get(tp))
}
再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全的删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。
Fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最小的数据量,当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者,对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间,而对于有大量消费者的主题来说,则可以明显减轻broker压力
Fetch.max.wait.ms
上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间的阻塞,这个参数默认时500ms
Max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M,也就是说,kafkaCOnsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M,如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息,实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
Max.poll.records
这个参数控制一个poll调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量
kafka可以将主题划分为多个分区(partition),会根据分区规则选择吧消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样实现了负载均衡和水平扩展,另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力。
由于消息时以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还高,是kafka高吞吐率的重要保证之一。
由于producer和Consumer都只会与leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用,kafka支持主被复制,所以消息具备高可用和持久性。
一个分区可以有多个副本,这些副本保存在不同的broker上,每个分区的副本中都会有一个作为leader,当一个broker失败时,lerder在这台borker上的分区都会变的不可用,kafka会自动移除leader,再其他副本中选择一个作为新的leader。
在通常情况下,增加分区可以提供kafka集群的吞吐量,然而,也应该意识到集群的总分区数,或者单台服务器上的分区数过多,会增加不可用或者延迟的风险。
如果在某个分区上leader挂了,那么其他副本会进行选举产生一个新的leader,之后所有的读写就会转移到这个新的leader上,在kafka中,其不是采用常见的多数选举方式进行副本的leader选举,而是会在zookeeper上针对每个topic维护一个成为ISR的集合,显然还有一些副本没来得及同步,只有这个ISR列表里面的才有资格成为leader。
通过ISR,kafka需要的冗余度比较低,可以容忍的失败数比较高,假设某个topic上有f+1个副本,kafkake可以容忍f个不可用,当然,如果全部ISR里面的副本不可用,也可以选择其他可用的副本,只是存在的数据不一致。
在部署好的kafka集群中添加机器是正常需求,但是新添加的kafka节点并不会自动的分配数据,所以无法分担集群的负载,除非我们新建一个topic,但是我们想手动将部分分区移到新添加的kafka节点上,kafka内部提供了相关的工具来重新分配某个topic的分区
具体步骤:
bin/kafka-topic.sh --create --zookeeper localhost:2181 --topic jm --partitions 3 --replication-factor 3
详情查看
bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic jm
bin/kafka-topic.sh --alter --zookeeper localhost:2181 --topic jm
添加一个broker节点
查看主题信息
bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic jm
重新分配
我们需要将原来分布在broker 1-3节点上的分区重新分布到1-4节点上,借助kafka-reassign-partitions.sh工具生成reassign plan,不过我们得按照要求定义一个文件,里面说明哪些topic需要重新分区,文件内容如下:
cat reassign.json
{"topics":[{"topic":"jm"}],
"version":1
}
然后使用Kafka-reassign-partitions.sh工具生成 reassign plan
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topic-to-move-json-file reassign.json --broker-list"0,1,2,3" --generate
–generate 表示指定类型参数
–topics-to-move-json-file 指定分区重新分配对应的主题清单路径
命令输入两个json字符串,第一个json内容为当前的分区副本分配情情况,第二个为重新分类的候选方案,注意这里只是生成了一份可行性的方案,并没真正的执行重新分配的动作
我们将第二个json内容保存到名为result.json文件中,然后执行这些rreassign plan;命令如下:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignm
查看重新分区的进度
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json --verify
RangeAssignor分配策略
RangeAssignor策略是的原理是按照消费者总数进行整除运算获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀地分配给所有的消费者,对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区
假设n=分区数量/消费者数量,m=分区水量%消费者数量,那么前m个消费者每个分配n+1个分区,后面的消费者每个分配n个分区
假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么所订阅的所有分区可以标识为t0p0、t0p1、t0p2、t0p3、t1p1、t1p2、t1p3.最终所有分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3
假设上面的例子中2个主题只有三个分区,那么所订阅的所有分区可以表示为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2.最终分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2
可以明显的看到这样的分配是不均匀的,如果将此类情形扩大,有可能出现部分消费者过载的情况。
RoundRobinAssignor分配策略
RoundRobinAssignor策略的原理是将消费组内的所有消费者以及订阅者所订阅的所有topic的partititon按照字典序排序,然后通过轮询的方式逐个将分区以此分配给每个消费者,RoundRobinAssignor策略对应的partition.assignment.strategy参数值为org.apche.kafka.clients.consumer.RoundRobinAssignor
假设消费组中有两个消费者C0和C1,都订阅了主题t0和t1,并且每个主题有三个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2.最终分配结果为:
消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2
如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配的不均匀,如果某个消费者没有订阅消费组内某个topic,那么在分配分区的时候此消费者将分配不到的这个topic的任何分区,
假如消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题,t0、t1、t2,这三个主题分别1、2、3个分区,即整个消费组订阅了t0p1、t1p0、t1p1、t2p0、t2p1、t2p2这六个分区,具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者c2订阅的是t0、t1和t2,那么最终分配结果:
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
可以看到RoundRobinAssignor策略也不是十分完美的,这样分配其实不是最优结果,因为完全可以将分区t1p1分配给消费者1.
StickyAssignor分配策略
kafka从0.11.X版本开始引入这种分配策略,它主要有两个目的
当两者发生冲突时,第一个目标优先于第二个目标,鉴于这两个目标, StickyAssignor的策略的具体实现要比RoundRobinAssignor和RangeAssignor的这两种分配策略复杂的多。
假设消费组内有3个消费者,C0、C1、C2,它们都订阅了4个主题,t0、t1、t2、t3,并且每个主题有两个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t3p0、t301这八个分区,最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
假设此时消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配,如果采用了ROundRobinAssignor策略,那么此时的分配结果如下
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobingAssignor策略会按照消费者C0和C2进行重新轮询分配,而如果此时使用的是StickyAssignor策略,分配结果如下
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t0p1、t1p0、t3p1、t2p1
可以看到分配结果还保留着上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的负担分配给剩余的两个消费者C0和C2,最终C0和C2的分配还保持了平衡
自定义分配策略
需要实现:org.apche.kafka.clients.consumer.internals.PartitionAssignor
继承:org.apche.kafka.clients.consumer.internals.AbstractPartitionAssignor
Segment file组成:由两部分组成 分别为index file和data file,此2个文件一一对应,成对出现后,后缀.index和.login分别表示segment索引文件、数据文件
segment文件命名规则:partition全局的第一个segment从0开始,兴许每一个segment文件名称为上一个segment文件最后一条消息的offset值。
数组最大为64位long大小,19位数字字符长度,没有数字用0填充
kafka解决查询效率的方法之一是将数据文件分段,比如有100条message,它们的offset是从0到99,假设将数据文件分成5段,第一段位0-19,第二段位20-39,以此类推,每段放在一个单独的数据文件中,数据文件以该段中的最小offset命名,这样在查找指定offset的message的时候,用二分法查找就可以定位到该message在哪个段中。
数据文件分段使得可以在一个较小的数据文件中查找对应的offset的message了,但是这依然需要顺序扫描才能找到对应的offset的message,为了进一步提高查询效率,kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名称一样,只是文件扩展名为.index
比如,要查找绝对索引offset为7的message
首先是用二分查找法确定它在哪个logSegment中,自然是在第一个segment中,打开这个segment的index文件,也是用二分查找法找到小于或者等于offset的索引条目中最大的那个offset,自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的message在数据文件中的位置为9807.
打开数据文件,从位置9807的那个地方开始顺序扫描到o ffset为7的那条message.
这套机制是建立在offset是有序的,索引文件被映射到内存中,所以查找的速度还是很快的。
总结:kafka的message存储采用了分区(partition)、分段(LogSegment)和稀疏索引这几个手段来达到高效性。
kafka日志管理器允许定制删除策略,目前的策略是删除修改时间在N天以上的日志(按时间删除),也可以使用另外一个策略,保留最后N GB的数据(按大小删除),为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照的副本上进行的,这类似于java的CopyOnWriteArrayList
kafka消费日志删除思想,kafka把topic中一个partition大文件分成了多个小文件段,通过小文件段,就容易定期清理或删除已经消费完的文件,减少磁盘占用
log.cleanip.policy=delete 启用删除策略
直接删除,删除后的消息不可恢复,可配置一下两个策略
清理超过指定时间的:
log.retention.hours=16
超过指定大小后,删除旧的消息
log.retention.bytes = 1073734556
将数据压缩,只保留每个key最后一个版本的数据,首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的,在topic的配置中设置log.cleanup.policy=compact启用压缩策略
压缩后的offset可能是不连续的。比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset打的offset对应的消息,比如试图拿到offset为5的消息,实际上会拿到offset为6的消息,并从这个位置开始消费。
这种策略只适合特种场景,比如消息的key是用户id,消息体是用户的资料,通过这种压缩策略,整个消息集里保存了所有用户的最新消息。
压缩策略支持删除,当某个key的最新版本的消息没有内容时,这个key被删除,这也符合以上逻辑。
kafka在设计时,采用了文件追加的方式来写入消息。即只能在日志文件的尾部追加新的消息。并不会修改已经写入的消息。这种方式属于典型的顺序写入磁盘的操作。所以就算时kafka的使用磁盘作为介质,所能实现的吞吐量也是非常可观的。
kafka中大量使用页缓存,这页是kafka实现高吞吐量的重要因素之一。
除了消息顺序追加,页缓存等技术,kafka还是用了零拷贝的技术来进一步提升性能,“零拷贝技术”只用将磁盘文件的数据复制到页面的缓存中一次,然后u讲数据从页面缓存直接发送到网络中,(发送给不同的订阅着时,可以使用同一个页面缓存)避免大量重复操作,如果有10个消费者,传统方式下,复制数据的次数为4*10=40次,而使用“零拷贝技术”,只需要1—+10=11次,一次从磁盘复制到页面缓存,10次代表10个消费者各读取一次页面缓存。
所谓幂等性就是对接口的多次调用所产生的结果和调用一次时一致的,生产者在进行重试的时候有可能会重复写入消息,使用kafka的幂等性功能就可以避免这种情况。
幂等性是有条件的:
producer使用幂等性的示例非常简单,与正常的情况下Producer使用相比较变化不大,只是需要把Producerd的配置enable.idempotence设置为true即可,默认为true 如下所示:
Properties props = new Propertues();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE__CONFG,"true");
props.put("acks","all") //当enable.idempotence 为true,这里默认为all
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
kafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic,"test"));
幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性,操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能性。
为了实现事务,应用程序必须提供唯一的transactionalid,这个参数通过客户端程序来进行设定。
properties.put(ProducerCOnfig.TRANSACTIONAL_ID_CONFIG,transactionid);
事务要求生产者开启幂等性特性,因此通过transactionalid参数设置为非空从而开启事务特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认为true),如果显示设置为false,则会抛出异常。
kafkaProducer提供了5个与事务相关的方法,如下:
//初始化事务,前提是配置了transactionalid
public void initTransactions(){
//开启事务
public void beginTransaction();
//为消费者提供事务内的位移提交操作
public void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets,String consumerGroupId)
//提交事务
public void commitTransaction()
//终止事务 ,蕾丝回滚
public void aboraTransaction()
}
在kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(kafka COntroller),它负责管理整个集群中所有分区和副本的状态,当某个分区的leader副本出现故障时,由控制器负责为该分区选择新的leader副本,当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其原数据信息,当使用kafka——topics.sh脚本为某个topic增加分区时,同样由控制器负责分区的重新分配。
kafkad的控制器选举工作依赖于zookeeper,成功竞选为控制器的broker会在zookeeper中创建controller这个临时节点,
在任意时刻,集群中有且仅有一个控制器,每个broker启动的时候会尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他broker节点成功竞选为控制器,所以当前broker节点放弃竞选,如果zookeeper中不存在/controller这个节点,或者这个节点的数据异常,那么就会尝试创建/controller这个节点,当前broker去创建节点时,也有可能其他broker同时去尝试创建broker节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker表示竞选失败,每个broker会在内存中保存当前控制器的broker的值,这个值可以标识为activecontrollerid,
zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久节点,节点中存放的是一个整型的controller_epoch值,controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也称为控制器纪元。
controller_epoch的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1,每个和控制器交互的请求都会携带上controller_epoch字段,如果请求的controller_epoch小于内存中的controller_epoch值,则认为这个请求是已经过期的控制器发送的请求,那么这个请求会认为是无效请求,如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了,由此可见,kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
可靠性保证,确保系统在各种不同环境中能够发生一致的行为,
kafka的保证
保证分区消息的顺序
只有当消息被写入分区的所有同步副本时,它才会认为是已提交状态。
只要还有一个副本时活跃的,那么已提交的消息就不会丢失
消费者只能读取已经提交的消息
失效副本
怎么判断一个分区是否有副本是处于同步失效状态呢,从kafka0.9.X版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为10000)来控制,当ISR中的一个follower副本滞后leader副本的时间超过参数replica.lag.time.max.ms指定的时间即判定为副本失效,需要将次follower副本剔出除ISR之外,具体实现原理很简单,当follower副本将leader副本的LED(Log End Offset,每个分区的最后一条消息的位置)之前的日志钱不同步,则认为该follower副本已经追赶leader副本,此时更为该副本的lastCaughUpTimeMs标识。kafka的副本管理器启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值,千万不要错误的认为follower副本只要拉取leader副本的诗句就会更新lastCaughtUpTimeMs,试想当leader副本的消息流入速度大于follower副本的拉取速度时,follower副本一致不断的拉取leader副本的消息也不能与leader副本同步,如果此follower副本处于ISR中,那么当leader副本失效时,而选取此follower副本为新的leader副本,那么就会有严重的消息丢失。
副本复制
kafka中的每个主题分区都被复制n次,其中的n是主题的复制因子,这允许kafka在集群服务器发生故障时自动切换这些副本,以便在出现故障时消息仍可用,kafka的复制时以分区为粒度的,分区的预写日志被复制到n个服务器,在n个副本中,一个副本作为leader,其他副本成为followers,顾名思义,producer只能往leader分区上写数据(读也只能从leader分区上进行),followers只按照顺序从leader上复制日志
一个副本可以不同步leader有如下几个原因
新启动副本,当用户给主题增加副本因子时,新的follower不再同步副本的列表中,知道他们完全赶上了leader日志
如何确定副本时滞后的
在服务器端现在只有一个参数需配置replica.lag.time.max.mx,这个参数解释replicas响应partition leader的最长等待时间,检测卡住或者失效副本的探测,如果一个replica失败导致发送来取请求时间查过replica.lag.time.max.mx,kafka会认为此replica已经死亡会从同步副本列表中剔除,检测慢副本机制会发生变化,如果一个replica开始落后于leader超过replica.lag.time.max.mx,,kafka会认为太缓慢并且从同步副本中移除,除非replica请求leader时间间隔大于replica.lag.time.max.mx,因此即使leader使流量激增和大批量写消息,kafka也不会从同步副本中移除该副本。
Kafka 0.11.0.0版本解决方案
早上上述两个问题的根本原因是在HW值被用于衡量副本的成功与否以及出现在failture时作为日志截断的依据,但HW的值更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能完成更新,故这中间发生的任何崩溃都会导致HW的值过期,鉴于这些原因,Kafka0.11版本引入的leader epoch来取代HW的值,leader端多开辟了一段内存区域专门保存leader的epoch消息,这样即使出现上看的场景也能很好的规避这些问题
所谓leader epoch实际上是一对值,(epoch,offset)epoch表示leader的版本号,从0开始,当leader变更过1次,epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的唯一,因此假设两个值
(0,0)
(1,120) 表示第一个leader从位移0开始写入消息,共写了120条(0,119),而第二个leader版本号为1,从位移120处开始写入消息
Leader broker中会保存这样一个缓存,并定期写入一个checkpoint文件中。
避免数据丢失:
避免数据不一致:
生产发送的消息没有收到正确的broker响应,导致producer重试
producer发出一条消息,broker落盘后因为网络原因发送端得到一个发送至呗的响应或者网络中断,然后producer收到一个可恢复的Exception重试消息导致消息发送失败
解决方案:
启动kafka的幂等性
Ack = 0 不重试
根本原因:
数据消费完没有及时的提交offset到broker
解决方案
取消自动提交
下游做幂等
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo(消费组名)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state
消费组内成员信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。