赞
踩
每当我们调用Kafka的poll()方法或者使用Spring的@KafkaListener(其实底层也是poll()方法)注解消费Kafka消息时,它都会返回之前被写入Kafka的记录,即我们组中的消费者还没有读过的记录。
这意味着我们有一种方法可以跟踪该组消费者读取过的记录。 如前所述,Kafka的一个独特特征是它不会像许多JMS队列那样跟踪消费过的记录。相反,它允许消费者使用Kafka跟踪每个分区中的位置(位移,也称偏移量)。
我们将更新分区中当前位置的操作称为提交位移(commits offset)。
offset 是 partition 中每条消息的唯一标识,是一个单调递增且不变的值,由 kafka 自动维护,offset 用于定位和记录消息在 partition 中的位置和消费进度,保证 partition 内的消息有序。
offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。
offset 的作用主要有两个:
offset在语义上拥有两种:Consumer Offset和Committed Offset。
消费者第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。
这样消费者下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。
这样就能够保证每次poll消息时,都能够收到不重复的消息。
Committed Offset是保存在kafka客户端上,主要通过commitSync
和commitAsync
API操作,SpringBoot集成Kafka中是使用ack.acknowledge()
。若消费者poll了消息但是不调用API,Committed Offset依旧为0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个分区分配给了一个消费者,消费者将从Committed Offset记录的序号后开始消费。又或者消费者调用了poll消费了5条消息并调用API更新了Committed Offset,然后宕机了过了一会儿又重启了,消费者也可以通过Committed Offset得知从第6条消息消费。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及Consumer Rebalance。
offset 的存储和管理主要涉及到两个方面:生产者端和消费者端。
生产者在向 Kafka 发送消息时,可以指定一个分区键(Partition Key),Kafka 会根据这个键和分区算法来决定消息应该发送到哪个分区。如果没有指定分区键,Kafka 会采用轮询或随机的方式来选择分区。生产者也可以自定义分区算法。
当消息被写入到分区后,Kafka broker 会为消息分配一个 offset,并返回给生产者。生产者可以根据返回的 offset 来确认消息是否成功写入,并进行重试或其他处理。
消费者在消费 Kafka 消息时,需要维护一个当前消费的 offset 值,以及一个已提交的 offset 值。当前消费的 offset 值表示消费者正在消费的消息的位置,已提交的 offset 值表示消费者已经确认消费过的消息的位置。
消费者在消费完一条消息后,需要提交 offset 来更新已提交的 offset 值。提交 offset 的方式有两种:自动提交和手动提交。
无论是自动提交还是手动提交,offset 的实际存储位置都是在 Kafka 的一个内置主题中:__consumer_offsets。这个主题有 50 个分区(可配置),每个分区存储一部分消费组(Consumer Group)的 offset 信息。Kafka broker 会根据消费组 ID 和主题名来计算出一个哈希值,并将其映射到 __consumer_offsets 主题的某个分区上。__consumer_offsets主题包含每个分区需要提交的偏移量。 但是,如果消费者组的消费者崩溃或新的消费者加入消费者组,这将触发重新平衡(rebalance),即消费者组内的消费者负责的分区会发生变化。 在重新平衡之后,可以为每个消费者分配一组新的分区而不是之前处理的分区。 然后消费者将读取每个分区的已提交偏移量并从那里继续。
__consumer_offsets 主题是 Kafka 0.9.0 版本引入的新特性,之前的版本是将 offset 存储在 Zookeeper 中。但是 Zookeeper 不适合大量写入,因此后来改为存储在 Kafka 自身中,提高了性能和可靠性。
Kafka 提供了一个配置参数 enable.auto.commit,默认为 true,表示开启自动提交功能。自动提交功能会在后台定期(由 auto.commit.interval.ms 参数控制)将当前消费的 offset 值提交给 Kafka broker。
提交偏移量的最简单方法是允许消费者来完成。 如果配置 enable.auto.commit=true
,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量。 五秒间隔是默认值,可通过设置auto.commit.interval.ms
来控制。 就像消费者中的其他机制一样,自动提交由poll loop驱动。 无论您何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量。
它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长,因为你的处理逻辑可能需要一定的时间。
数据重复写入
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。
位移提交和rebalance
虽然自动提交很方便,但是它也有一定的不足。
请注意,默认情况下,自动提交每五秒钟发生一次。 假设我们在最近的提交之后三秒钟并且触发了重新平衡。 在重新平衡之后,所有消费者将从最后提交的偏移开始消费。 在这种情况下,偏移量是三秒钟之前的偏移量,因此在这三秒内到达的所有事件将被处理两次。 可以将提交间隔配置为更频繁地提交并减少记录将被复制的窗口,但是不可能完全消除它们。这是自动提交机制的一个缺陷(其实就是重复消费的问题)。
启用自动提交后,对poll的调用将始终提交上一轮询返回的最后一个偏移量。 它不知道实际处理了哪些事件,因此在再次调用poll()之前,始终处理完poll()返回的所有事件至关重要, 因为和poll()一样,close()方法也会自动提交偏移量。
其实仔细思考,手动提交也存在这个问题,因为rebalance会先让所以的消费者停止消费,因为在kafka的角度来看,消息消费的那一刻,消费已经完成,所以停止消费的时候,你的逻辑很可能没有完成,那么你的offset 也很可能没有提交。在rebalance后分区重新分配的消费者会重新从服务端获取分区的offset值,此时可能是消费端提交前的offset,也会产生重复消费问题。
自动提交很方便,但它们不能给开发人员足够的控制以避免重复的消息。
如果 enable.auto.commit 设置为 false,则表示关闭自动提交功能,此时消费者需要手动调用 commitSync 或 commitAsync 方法来提交 offset。手动提交功能可以让消费者更灵活地控制何时以及如何提交 offset。
enable.auto.commit
为 falsepublic static void main(String[] args) { while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { // 模拟消息的处理逻辑 System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); try { //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息 consumer.commitSync(); } catch (CommitFailedException e) { e.printStackTrace(); } } }
SrpingBoot中是通过ack.acknowledge()
达到手动提交的目的。
@KafkaListener(topics = "order", groupId = "order_group")
public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) {
System.out.println("Received: " + record);
ack.acknowledge();
}
从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。
commitSync()的问题在于,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,需要注意的是同步提交会在提交失败之后进行重试。
在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS
enable.auto.commit
为 false下面都是三个测试用例都是异步提交,不同之处在于有没有去实现回调函数。建议生产环境中一定要实现,至少记录下日志。
@Test public void asynCommit1(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync(); } } @Test public void asynCommit2(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); // 异步回调机制 consumer.commitAsync(new OffsetCommitCallback(){ @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } } }); } } @Test public void asynCommit3(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync((offsets, exception) ->{ if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } }); } }
从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的,所以只要在程序停止前最后一次提交成功即可。
这里提供一个解决方案,那就是不论成功还是失败我们都将offsets信息记录下来,如果最后一次提交成功那就忽略,如果最后一次没有提交成功,我们可以在下次重启的时候手动指定offset。
try {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println(String.format("提交失败:%s", e.toString()));
} finally {
consumer.commitSync();
}
同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。
对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); for (ConsumerRecord<String, String> record : records) { // 数据的处理逻辑 System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); // 记录下offset 信息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); if (count % 100 == 0) { // 回调处理逻辑是null consumer.commitAsync(offsets, null); } count++; } try { //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息 consumer.commitSync(offsets); } catch (CommitFailedException e) { e.printStackTrace(); } }
在实际应用中,我们通常会根据业务需求的不同,选择不同的offset提交方式。
提交 offset 是消费者在消费完一条消息后,将当前消费的 offset 值更新到 Kafka broker 中的操作。提交 offset 的目的是为了记录消费进度,以便在消费者发生故障或重启时,能够从上次消费的位置继续消费。
重置 offset 是消费者在启动或运行过程中,将当前消费的 offset 值修改为其他值的操作。重置 offset 的目的是为了调整消费位置,以便在需要重新消费或跳过某些消息时,能够实现这个需求。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
前面已经介绍过自动提交和手动提交这两种方式的区别和用法,这里不再赘述。需要注意的是,无论是自动提交还是手动提交,都不保证提交成功。因为 Kafka broker 可能发生故障或网络延迟,导致提交失败或延迟。因此,消费者需要处理提交失败或延迟的情况。
提交失败:如果提交失败,消费者可以选择重试或放弃。重试的话,可能会导致多次提交同一个 offset 值,但是不会影响正确性,因为 Kafka broker 会忽略重复的 offset 值。放弃的话,可能会导致下次启动时重新消费已经消费过的消息,但是不会影响完整性,因为 Kafka 消息是幂等的。
提交延迟:如果提交延迟,消费者可以选择等待或继续。等待的话,可能会导致消费速度变慢,或者超过 session.timeout.ms
参数设置的时间而被认为已经死亡。继续的话,可能会导致下次启动时漏掉一些没有提交成功的消息。
重置 offset 的方式有两种:手动重置和自动重置。
手动重置是指消费者主动调用 seek
或 seekToBeginning
或 seekToEnd
方法来修改当前消费的 offset 值。手动重置可以让消费者精确地控制从哪个位置开始消费。例如,如果想要重新消费某个分区的所有消息,可以调用 seekToBeginning
方法将 offset 设置为 0;如果想要跳过某个分区的所有消息,可以调用 seekToEnd
方法将 offset 设置为最大值;如果想要从某个具体的位置开始消费,可以调用 seek
方法将 offset 设置为任意值。
自动重置是指消费者在启动时根据 auto.offset.reset
参数来决定从哪个位置开始消费。消费者配置auto.offset.reset
表示Kafka中没有存储对应的offset信息的(有可能offset信息被删除),亦或者offset所处位置信息过期了的情况,消费者从何处开始消费消息。auto.offset.reset
参数有三个可选值:earliest, latest 和 none。
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
offset 的消费和保证主要涉及到两个方面:顺序性和一致性。
顺序性
顺序性是指 Kafka 消息是否按照发送和接收的顺序进行处理。Kafka 只保证分区内的顺序性,即同一个分区内的消息按照 offset 的顺序进行发送和接收。但是不保证主题内或跨主题的顺序性,即不同分区内的消息可能会乱序发送和接收。因此,如果需要保证主题内或跨主题的顺序性,需要在生产者和消费者端进行额外的处理,例如使用同一个分区键或同一个消费组。
一致性
一致性是指 Kafka 消息是否能够被正确地发送和接收,不会出现丢失或重复的情况。Kafka 提供了三种不同级别的一致性保证:最多一次(At most once),最少一次(At least once)和精确一次(Exactly once)。
如果提交的偏移量小于客户端处理的最后一条消息的偏移量,那么最后处理的偏移量与提交的偏移量之间的消息将被处理两次。
如下图:
如果提交的偏移量大于客户端实际处理的最后一条消息的偏移量,那么消费者组将忽略上次处理的偏移量与提交的偏移量之间的所有消息。如下图:
从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。因为KafkaConsumer.commitSync()有重试机制,所以一般的网络原因可以排除,发生这个异常的原因主要就是超时了,但是这个超时不是说提交本身超时了,而是消息的处理时间超长,导致发生了Rebalance,已经将要提交位移的分区分配给了另一个消费者实例。
熟悉的错误:
Exception in thread “main” org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
比如之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
这取决于 Consumer 端参数 max.poll.interval.ms
的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms
参数的值。不幸的是,session.timeout.ms
参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms
参数,将这部分含义从 session.timeout.ms
中剥离出来的原因之一。
这取决于 Consumer 端参数 max.poll.records
的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。
可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。
如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。
之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。
如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。
事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu
故最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。
问题:消费者在消费消息的过程中,配置参数
spring.kafka.listener .ackMode
设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?
spring.kafka.listener.ackMode
:指定消息确认模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式,用于控制消息的确认方式。
ackMode是个枚举类型:
————————————————————————————————————————————————————————————
首先简单的介绍一下消费者对topic的订阅。
我们进入正题,对开头提出的问题的总结如些:
注意:以下情况均基于kafka的消费者关闭自动提交offset的条件下。亦是基于同一个消费者组的情况,因为不同的消费者组之间,他们彼此的offset偏移量是完全独立的。
如果消费端在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
如果在消费的过程中有几条或者一批数据数据没有提交offset(比如异常情况程序没有走到手动提交的代码),后面其他的消息消费后正常提交offset至服务端,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序或者rebalance也不会重新消费。
消费端如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你没有提交offset的消息时你新增或者减少消费端,此时会发生rebalance现象,即可再次消费到这个未提交offset的数据,产生重复消费问题。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在发生rebalance现象之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
总结就是如果消费端不提交或者抛异常,相当于一直没有提交offset,在此程序运行过程中不会重复消费。除非是重启,或者有新的消费者退出或者加入导致重新平衡的时候才会再次触发消费;而且有新的消费端正常消费且提交offset以后,服务端就会更新最新的offset,这样就算程序重启或者重新平衡也不会重新消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。