赞
踩
应用程序使用KafkaConsumer向Kafka订阅主题,并从订阅的主题中接收消息。不同于从其他消息系统读取数据,从Kafka读取数据涉及一些独特的概念和想法。如果不先理解这些概念,则难以理解如何使用消费者API。本文将先解释这些重要的概念,然后再举几个例子,演示如何使用消费者API实现不同的应用程序。
消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者负责读取这个主题的部分消息。
假设主题T1有4个分区,我们创建了消费者C1,它是群组G1中唯一的消费者,用于订阅主题T1。消费者C1将收到主题T1全部4个分区的消息,如下图所示:
如果在群组G1里新增一个消费者C2,那么每个消费者将接收到两个分区的消息。假设消费者C1接收分区0和分区2的消息,消费者C2接收分区1和分区3的消息,如下图所示:
如果群组G1有4个消费者,那么每个消费者将可以分配到一个分区:
如果向群组里添加更多的消费者,以致超过了主题的分区数量,那么就会有一部分消费者处于空闲状态,不会接收到任何消:
kafka 的 Topic 是可以同时被多个消费者组消费的,比如说有两个程序想要同时读取数据,就可以将这两个程序放到两个不同的消费者组中:
如上文所述,消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。主题发生变化(比如管理员添加了新分区)会导致分区重分配。
分区的所有权从一个消费者转移到另一个消费者的行为称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(你可以放心地添加或移除消费者)。不过,在正常情况下,我们并不希望发生再均衡。
根据消费者群组所使用的分区分配策略的不同,再均衡可以分为两种类型。
主动再均衡
在进行主动再均衡期间,所有消费者都会停止读取消息,放弃分区所有权,重新加入消费者群组,并获得重新分配到的分区。
这样会导致整个消费者群组在一个很短的时间窗口内不可用。这个时间窗口的长短取决于消费者群组的大小和几个配置参数。
主动再均衡包含两个不同的阶段:第一个阶段,所有消费者都放弃分区所有权;第二个阶段,消费者重新加入群组,获得重新分配到的分区,并继续读取消息:
协作再均衡
协作再均衡(也称为增量再均衡)通常是指将一个消费者的部分分区重新分配给另一个消费者,其他消费者则继续读取没有被重新分配的分区。这种再均衡包含两个或多个阶段。
在第一个阶段,消费者群组首领会通知所有消费者,它们将失去部分分区的所有权,然后消费者会停止读取这些分区,并放弃对它们的所有权。
在第二个阶段,消费者群组首领会将这些没有所有权的分区分配给其他消费者。
虽然这种增量再均衡可能需要进行几次迭代,直到达到稳定状态,但它避免了主动再均衡中出现的“停止世界”停顿。
消费者会向被指定为群组协调器的broker(不同消费者群组的协调器可能不同)发送心跳,以此来保持群组成员关系和对分区的所有权关系。心跳是由消费者的一个后台线程发送的,只要消费者能够以正常的时间间隔发送心跳,它就会被认为还“活着”。
如果消费者在足够长的一段时间内没有发送心跳,那么它的会话就将超时,群组协调器会认为它已经“死亡”,进而触发再均衡。如果一个消费者发生崩溃并停止读取消息,那么群组协调器就会在几秒内收不到心跳,它会认为消费者已经“死亡”,进而触发再均衡。在这几秒时间里,“死掉”的消费者不会读取分区里的消息。在关闭消费者后,协调器会立即触发一次再均衡,尽量降低处理延迟。
在默认情况下,消费者的群组成员身份标识是临时的。当一个消费者离开群组时,分配给它的分区所有权将被撤销;当该消费者重新加入时,将通过再均衡协议为其分配一个新的成员ID和新分区。
如果想要当消费者重启后,消费的分区不变,继续消费之前消费的分区,就可以给消费者分配一个唯一的group.instance.id,让它成为群组的固定成员。
如果两个消费者使用相同的group.instance.id加入同一个群组,则第二个消费者会收到错误,告诉它具有相同ID的消费者已存在。
固定成员不能离开群组太久,在 session.timeout.ms
时间内重新加入会继续使用之前的 ID 以及消费之前的分区,当超过该时间后,该成员被认定为"死亡", 需要以新的身份加入群组,消费新的分区。
session.timeout.ms
参数不能设置的太大,以便在出现严重停机时自动重新分配分区,避免这些分区的读取进度出现较大的滞后。又不能设置的太小,避免在进行简单的应用程序重启时触发再均衡。
创建消费者
三个必填参数:bootstrap.servers
、key.deserializer
和value.deserializer
。
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String,String>(props);
订阅 Topic
可以通过主题列表订阅一个或多个主题:
consumer.subscribe(Collections.singletonList("customerCountries")); ➊
也可以通过正则表达式匹配订阅多个主题:
consumer.subscribe(Pattern.compile("test.*"));
使用正则需要注意的是,消费者客户端会拉取集群的所有主题和分区元信息,可能会给网络带来很大的开销。而且为了能够使用正则表达式订阅主题,需要授予客户端获取集群全部主题元数据的权限,即全面描述整个集群的权限。
Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %d, offset = %d, " + "customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.containsKey(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount); JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString()); } }
PartitionAssignor根据给定的消费者和它们订阅的主题来决定哪些分区应该被分配给哪个消费者。Kafka提供了几种默认的分配策略。
区间(range)
轮询(roundRobin)
黏性(sticky)
协作黏性(cooperative sticky)
我们把更新分区当前读取位置的操作叫作偏移量提交。与传统的消息队列不同,Kafka不会提交每一条记录。相反,消费者会将已成功处理的最后一条消息提交给Kafka,并假定该消息之前的每一条消息都已成功处理。
消费者会向一个叫作 __consumer_offset的主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。
再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。
重复消费消息
如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:
消息丢失
如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失:
commitSync()
主动提交当前偏移量。commitSync()
,那么一旦应用程序发生崩溃,就会有丢失消息的风险(消息已被提交但未被处理)。commitSync()
还没有被调用,那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理。Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } }
前边的手动提交,会阻塞等待 broker 响应,限制应用程序的吞吐量,此时可以使用consumer.commitAsync()
异步提交,无需等待 broker 响应。
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
异步提交可能会出现的问题:
在提交成功或碰到无法恢复的错误之前,commitSync()会一直重试,但commitAsync()不会,这是commitAsync()的一个缺点。之所以不进行重试,是因为commitAsync()在收到服务器端的响应时,可能已经有一个更大的偏移量提交成功。
假设我们发出一个提交偏移量2000的请求,这个时候出现了短暂的通信问题,服务器收不到请求,自然也不会做出响应。
与此同时,我们处理了另外一批消息,并成功提交了偏移量3000。如果此时commitAsync()重新尝试提交偏移量2000,则有可能在偏移量3000之后提交成功。这个时候如果发生再均衡,就会导致消息重复。
Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); }
异步提交中的重试
可以用一个单调递增的消费者序列号变量来维护异步提交的顺序。每次调用commitAsync()后增加序列号,并在回调中更新序列号变量。在准备好进行重试时,先检查回调的序列号与序列号变量是否相等。如果相等,就说明没有新的提交,可以安全地进行重试。如果序列号变量比较大,则说明已经有新的提交了,此时应该停止重试。
Duration timeout = Duration.ofMillis(100); try { while (!closing) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync();➊ } consumer.commitSync();➋ } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); }
commitAsync()
提交偏移量。这样速度更快,而且即使这次提交失败,下一次提交也会成功。commitSync()
会一直重试,直到提交成功或发生无法恢复的错误。前边的提交方式都是默认提交消息批次里的最后一个偏移量,我们也可以自己记录已消费的偏移量,然后提交特定的偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); ➊ int count = 0; .... Duration timeout = Duration.ofMillis(100); while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); ➋ currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); ➌ if (count % 1000 == 0) ➍ consumer.commitAsync(currentOffsets, null); ➎ count++; } }
❶ 用于跟踪偏移量的map。
❷ printf表示处理消息的过程。
❸ 在读取每一条记录之后,将下一条要处理的消息的偏移量更新到map中。提交的偏移量应该是应用程序要处理的下一条消息的偏移量,下一次就从这个位置开始读取。
❹ 我们决定每处理1000条记录就提交一次偏移量。在实际当中,可以基于时间或记录的内容来提交偏移量。
❺ 这里调用的是commitAsync()(没有回调,所以第二个参数是null),不过调用commitSync()也是可以的。当然,在提交特定偏移量时仍然要处理可能出现的错误,就像之前那样。
消费者API提供了一些方法,让我们可以在消费者分配到新分区或旧分区被移除时执行一些代码逻辑。我们所要做的就是在调用subscribe()
方法时传进去一个ConsumerRebalanceListener
对象。ConsumerRebalanceListener
有3个需要实现的方法:
public void onPartitionsAssigned(Collection partitions)
public void onPartitionsRevoked(Collection partitions)
public void onPartitionsLost(Collection partitions)
下面的例子演示了如何在失去分区所有权之前通过onPartitionsRevoked()方法来提交偏移量:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); Duration timeout = Duration.ofMillis(100); private class HandleRebalance implements ConsumerRebalanceListener { ➊ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { ➋ } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in rebalance. " + "Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); ➌ } } try { consumer.subscribe(topics, new HandleRebalance()); ➍ while (true) { ConsumerRecords<String, String> records = consumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, null)); } consumer.commitAsync(currentOffsets, null); } } catch (WakeupException e) { // 忽略异常 } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); System.out.println("Closed consumer and we are done"); } }
❶ 首先,需要实现ConsumerRebalanceListener
接口。
❷ 在这个例子中,在分配到新分区时我们不做任何事情,直接开始读取消息。
❸ 如果发生了再均衡,则要在即将失去分区所有权时提交偏移量。我们提交的是所有分区而不只是那些即将失去所有权的分区的偏移量——因为我们提交的是已处理过的消息的偏移量,所以不会有什么问题。况且,我们会使用commitSync()
方法确保在再均衡发生之前提交偏移量。
❹ 把ConsumerRebalanceListener
对象传给subscribe()
方法,这样消费者才能调用它,这是非常重要的一步。
如果想从分区的起始位置读取所有的消息,或者直接跳到分区的末尾读取新消息,那么Kafka API分别提供了两个方法:seekToBeginning(Collection<Topic Partition>tp)
和seekToEnd(Collection<TopicPartition> tp)
。
Kafka还提供了用于查找特定偏移量的API。这个API有很多用途,比如,对时间敏感的应用程序在处理速度滞后的情况下可以向前跳过几条消息,或者如果消费者写入的文件丢失了,则它可以重置偏移量,回到某个位置进行数据恢复。
下面的例子演示了如何将分区的当前偏移量定位到在指定时间点生成的记录:
Long oneHourEarlier = Instant.now().atZone(ZoneId.systemDefault())
.minusHours(1).toEpochSecond();
Map<TopicPartition, Long> partitionTimestampMap = consumer.assignment()
.stream()
.collect(Collectors.toMap(tp -> tp, tp -> oneHourEarlier)); ➊
Map<TopicPartition, OffsetAndTimestamp> offsetMap
= consumer.offsetsForTimes(partitionTimestampMap); ➋
for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry: offsetMap.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset()); ➌
}
❶ 首先,创建一个map,将所有分配给这个消费者的分区(通过调用consumer.assignment())映射到我们想要回退到的时间戳。
❷ 然后,通过时间戳获取对应的偏移量。这个方法会向broker发送请求,通过时间戳获取对应的偏移量。
❸ 最后,将每个分区的偏移量重置成上一步返回的偏移量。
如果我们确定马上要关闭消费者(即使消费者还在等待一个poll()返回),那么可以在另一个线程中调用consumer.wakeup()
。
如果轮询循环运行在主线程中,那么可以在ShutdownHook
里调用这个方法。
需要注意的是,consumer.wakeup()
是消费者唯一一个可以在其他线程中安全调用的方法。调用consumer.wakeup()
会导致poll()
抛出WakeupException
,如果调用consumer.wakeup()
时线程没有在轮询,那么异常将在下一次调用poll()
时抛出。不一定要处理WakeupException
,但在退出线程之前必须调用consumer.close()
。
消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。
Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Starting exit..."); consumer.wakeup(); ➊ try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); ... Duration timeout = Duration.ofMillis(10000); ➋ try { // 一直循环,直到按下Ctrl-C组合键,关闭钩子会在退出时做清理工作 while (true) { ConsumerRecords<String, String> records = movingAvg.consumer.poll(timeout); System.out.println(System.currentTimeMillis() + "-- waiting for data..."); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); movingAvg.consumer.commitSync(); } } catch (WakeupException e) { // 忽略异常 ➌ } finally { consumer.close(); ➍ System.out.println("Closed consumer and we are done"); }
❶ ShutdownHook运行在单独的线程中,所以退出轮询循环最安全的方式只能是调用wakeup()。
❷ 一个比较长的轮询超时时间。如果轮询的时间足够短,并且不介意在退出之前等一小会儿,那么就没有必要调用wakeup(),只需在每次轮询时检查一下原子布尔变量即可。较长的轮询超时时间在处理低吞吐量主题时比较有用,因为当broker没有新数据返回时,客户端在轮询时占用的CPU时间会更少。
❸ 因为在另一个线程中调用了wakeup(),所以poll()会抛出WakeupException。你可能想捕获异常,确保应用程序不会意外退出,但实际上在这里无须对它做任何处理。
❹ 在退出之前,确保彻底关闭了消费者。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。