赞
踩
每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值
我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)
leader副本上保存这些远程副本的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。
Last Stable Offset 简称LSO,它与kafka的事物有关。
当消费端的参数isolation.level
设置为“read_committed"的时候,那么消费者就会忽略事务未提交的消息,既只能消费到LSO(LastStableOffset)的位置,默认情况下,”read_uncommitted",可以消费到HW(High Watermak)的位置。
也就是说开启kafka事务的同时,生产者发送了若干消息,(msg1,msg2)到broker中,如果生产者没有提交事务(执行CommitTransaction)那么对于isolation.level=read_committed的消费者而言是看不多这些消息的,而isolation.level=read_uncommitted则可以看到。
其实Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms", 1000);
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record)->{
System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
});
}
}
public static void main(String[] args) { while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { // 模拟消息的处理逻辑 System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); try { //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息 consumer.commitSync(); } catch (CommitFailedException e) { e.printStackTrace(); } } }
存在的问题
下面都是三个测试用例都是异步提交,不同之处在于有没有去实现回调函数。建议生产环境中一定要实现,至少记录下日志。
@Test public void asynCommit1(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync(); } } @Test public void asynCommit2(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); // 异步回调机制 consumer.commitAsync(new OffsetCommitCallback(){ @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } } }); } } @Test public void asynCommit3(){ while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync((offsets, exception) ->{ if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } }); } }
存在的问题
try {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
});
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println(String.format("提交失败:%s", e.toString()));
} finally {
consumer.commitSync();
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); for (ConsumerRecord<String, String> record : records) { // 数据的处理逻辑 System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); // 记录下offset 信息 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); if (count % 100 == 0) { // 回调处理逻辑是null consumer.commitAsync(offsets, null); } count++; } try { //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息 consumer.commitSync(offsets); } catch (CommitFailedException e) { e.printStackTrace(); } }
kafka 也为我们提供了这样的组件,让我们可以去在发生Rebalance的时候做一些操作,实现ConsumerRebalanceListener
接口,然后在订阅kafka topic 的时候传入,一个常见的常见就是对offset 的管理。因为Rebalance 可能导致数据重复消费。
对于Kafka而言,从poll方法返回消息的那一刻开始这条消息已经算是“消费”完成了,这个时候如果发生了重平衡,你的offset 没有提交的话,重平衡之后会重复消费。所以我们希望在重平衡之前进行offset 的提交。
public class ConsumerRebalance { private static KafkaConsumer<String, String> consumer; private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); /** * 初始化消费者 */ static { Properties configs = initConfig(); consumer = new KafkaConsumer<String, String>(configs); consumer.subscribe(Arrays.asList("flink_json_source_4"), new RebalanceListener(consumer)); } /** * 初始化配置 */ private static Properties initConfig() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", 1000); props.put("session.timeout.ms", 30000); props.put("max.poll.records", 1000); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; } public static void main(String[] args) { while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no matadata")); }); consumer.commitAsync(); } } static class RebalanceListener implements ConsumerRebalanceListener { KafkaConsumer<String, String> consumer; public RebalanceListener(KafkaConsumer consumer) { this.consumer = consumer; } // 在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管partition的消费者就知道该从哪里开始读取了。 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //用于跟踪偏移量的map consumer.commitSync(currentOffsets); } // 在重新分配partition之后和消费者开始读取消息之前被调用。 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { consumer.commitSync(currentOffsets); } } }
再均衡监听器有其他很多的作用,这里只是其应用的一个场景。
从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。因为KafkaConsumer.commitSync()有重试机制,所以一般的网络原因可以排除,发生这个异常的原因主要就是超时了,但是这个超时不是说提交本身超时了,而是消息的处理时间超长,导致发生了Rebalance,已经将要提交位移的分区分配给了另一个消费者实例
我们首先模拟一下自动提交会不会发生这样的情况
private static Properties initConfig(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit",true); props.put("auto.commit.interval.ms", 3000); props.put("session.timeout.ms", 30000); props.put("max.poll.records", 1000); props.put("max.poll.interval.ms", 5000); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; } public static void main(String[] args) throws InterruptedException { while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record)->{ System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic()); }); Thread.sleep(6000L); } }
我们发现并不会,然后我们模拟一下手动提交
/** * 初始化配置 */ private static Properties initConfig(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit",false); props.put("session.timeout.ms", 30000); props.put("max.poll.records", 1000); props.put("max.poll.interval.ms", 5000); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; } public static void main(String[] args) throws InterruptedException { while (true) { // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record)->{ System.out.println("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic()); }); Thread.sleep(6000L); consumer.commitSync(); } }
这下我们就看到这个熟悉的错误了
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。