赞
踩
最近在使用flink的时候,因为对kafka的基础知识了解不深入,导致了一些问题,所以趁着这个机会看了一下kafka消费端的代码,整理处理。
0.8.2版本
0.9版本
0.11 版本
1.0.0版本
对于版本的问题,一般还是根据客户端和服务端保持一致的版本,如果实在有问题,尽量保证高版本客户端兼容低版本的服务端。
里面几个比较重要的版本就是0.9版本,几乎各大厂商都是0.9版本以上,使用了新的消费客户端。
对于flink而言,如果要保证Exactly-Once, 尽量是使用0.11版本以上,如果只是客户端的一致性,就不用太过于考虑版本了。
绝大多数的应用场景,用户不需要关心consumer的上下线,自动重连,消费迁移等。
为了避免reblance造成的数据重复读取或者丢失的情况,用户手动消费具体的分区,负载均衡全部需要用户自己控制。
对于有追踪之前的数据,多次在不同的位点回溯消费等批量获取数据。而且大多数场景下需要exactly-once保证消息的不丢失和唯一性。
subscribe(), assign()在拉取数据的时候使用的是相同的方法体,关于客户端的部分参数是相同的逻辑。关于poll源码的具体逻辑可以参考这篇博文:添加链接描述
enable.auto.commit=true, 消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交的时间间隔auto.commit.interval.ms控制,默认5s。自动提交是在轮询中进行。
auto.commit.offset=false, 自行决定何时提交偏移量,分为同步提交commitSync()和commitAsync(); commitSync模式在成功提交或者碰到无法恢复的错误之前,会一直重试。而commitAsync不会, 只会试用future等待超时时间过去,之所以不会被重试,对于异步提交而言,是没有顺序的,如果同时存在多个异步提交,进行重试可能会导致位移覆盖,就会导致消息重复消费。
在触发关闭消费者或者均衡前的最后一次提交,如果异步没有提交成功就需要用同步的方式进行轮询尝试提交。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(); } } catch (Exception e) { System.out.println("commit failed"); } finally { try { consumer.commitSync(); } finally { consumer.close(); }
上面讲的commitSync(), commitAsync()都是提交最后的偏移量,当然也可以通过提交特定偏移量的offset, 这个经常用在低级api里面。
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
在subscribe模式下,可以设置true/false, 在assign模式下只能为false.
如果不考虑数据丢失的情况下,可以使用自定提交的模式。
如果对于数据做了幂等的操作,数据可重复读对结果并没有影响,一般使用enable.auto.commit.
如果要求数据只处理一次,只能用手工提交,同步+异步的模式以及两阶段提交的才能够保证数据提交+offset提交是原子操作。
KafkaComsumer.poll()方法, 等待拉取的时常。如果等于0,不管buffer中有没有可用的数据,如果有直接拿到返回,没有返回empty. 如果大于0,并且buffer中没有可用的数据,会一直轮询直到对应的时间超时为止。
这个配置有一个妙用,可以帮助我们确定数据是否在正常消费,比如对于一个均匀的消息报,如果超过timeout.ms还没有收到消息,是否可以认为上游消息报发送不及时或者挂掉。合理设置timeout.ms的大小,对于我们排查问题有一定的帮助。
max.partition.fetch.bytes 表示consumer每次发起fetch请求读取到的分配的每个分区的数据最大值。并且读取到的数据放入到本地缓存。
max.poll.record 表示每次poll的时候从缓存中拉取到所分配所有分区的数据最大记录数。
这就会有一个很有意思的问题,比如每次fetch的数据是100条,每次poll的数据是10条,那么需要执行10次poll才能把一次fetch的数据读完。如果每次fetch的数据是10条,max.poll.record=50,那么每次poll的数据就是10. 所以一般而言 max.partition.fetch.bytes / per recore size >= max.poll.record 比较好,才会让线程发挥最大的价值。
heartbeat.interval.ms 心跳间隔。心跳是在consumer与coordinator之间进行的,心跳用来保持consumer的会话,并且在有consumer加入或者离开group时帮助进行rebalance。
session.timeout.ms, Consumer session 过期时间, 本身是个逻辑值,一般是需要和heartbeat.interval.ms配合使用。
heartbeat线程 每隔heartbeat.interval.ms向coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms 时间内 向 coordinator发送过心跳包,那么 group coordinator就认为当前的kafka consumer是活着的
通常设置heartbeat.interval.ms < session.timeout.ms / 3。以防止突然网络卡顿的现象。
max.poll.interval.ms subscribe模式下,两次Poll之间最大的延迟时间,也是rebalance的timeout,max.poll.interval.ms = record的处理时间 + poll的时间 + commit的时间
如果两次poll时间超过当前时间,就会认为当前consumer挂了,就会导致重新reblance。
必须尽力保证一次poll的消息很快的完成,无论我们的业务代码做了什么操作(耗时的,短时的)。
如果我们业务代码是耗时的,需要适当的增大max.poll.interval.ms, 并且减少max.poll.record。对于耗时的业务代码,也可以使用异步线程去调用,以免超时造成reblance.
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
提交的偏移量大于客户端处理的最后一个消息的偏移量,在发生消费关闭/reblance时候,那么处于两个偏移量之间的消息将会丢失
出现at most once的一些场景
提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理
如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移,而且在很多业务中需要考虑记录的顺序执行。因为kafka本身是分区顺序保证,所以需要从业务层面去考虑这个问题
在我的经验中,kafka消费的三种模式都有用到,
每种模式都有自己的使用场景,在使用的时候我们需要考虑的点
https://www.cnblogs.com/yoke/p/11405397.html
https://cloud.tencent.com/developer/article/1676951
https://blog.csdn.net/pml18710973036/article/details/87207071
https://blog.csdn.net/weixin_39662432/article/details/111665050
https://blog.csdn.net/weixin_43956062/article/details/106781317?spm=1001.2014.3001.5501
http://kafka.apache.org/0110/documentation.html#consumerconfigs
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。