赞
踩
在kafka 0.9版本之后,kafka为了降低zookeeper的io读写,减少network data transfer,也自己实现了在kafka server上存储consumer,topic,partitions,offset信息将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。
在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始看(消费)。Kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。
在上述代码最后调用consumer.close()时候也会触发自动提交,因为它默认autocommit=True。
对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好。
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交
同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
每轮循一个批次,手动提交一次,只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息,经测试10W条消息耗时4.58s
异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。
通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失
(https://www.cnblogs.com/FG123/p/10091599.html)受益匪浅
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。