赞
踩
应用程序通过KafkaConsumer订阅一个topic之后收取数据来完成从kafka的数据读取。从kafka读取数据与从其他消息系统读取数据只有少许不同,几乎没用什么独特的概念。如果不理解这些概念,你将很难使用消费者API。我们首先对一些重要的概念进行解释,然后介绍一些示例,这些示例展示了使用消费者API在不同需求的应用程序中的不同方式。
为了了解如何从kafka读取数据,首先需要了解消费者和消费者组的概念。下面的章节讲对此进行介绍。
假定你有一个应用程序需要从kafka的某一个topic中读取消息,之后进行验证,并写入另外一个数据库中。在这种情况下,应用程序将创建要给消费者对象,之后开始接收消息,验证并将结果写入数据库。这可能在开始的一段时间内没用什么问题,但是,一段时间之后,kafka的topic中消息的写入速度大大超过了你消费程序消费并验证的速度。如果你只用单个消费者来读取和处理数据,那么你的应用程序处理的数据将会越来越落后,无法跟上topic中消息写入的速度。显然,有必要从topic上对消费者扩容,就像多个生产者同时向相同的topic写入一样,我们需要允许多个消费者从相同的topic中消费数据,把数据均分给多个消费者。
Kafka的消费者是消费者组的一部分,当多个消费者订阅相同的主题并属于同一消费者组的时候,同组的每个消费者将从topic的不同分区读取消息。
如下以Topic T1为例,它有4个分区,现在假定我们创建了一个消费者C1,它是消费者组G1中的唯一消费者,用它订阅topic T1。消费者C1将获得来自T1 4个分区的所有消息。
如果我们将另外一个消费者C2添加到组G1,每个消费者将分别从其中的两个分区获得消息,可能分区0和2的消息会被C1消费,而1和3的消息将会被C2消费。
如果G1有4个消费者,那么每个消费者将从单个分区读取消息。
如果我们向具有当个topic的组中添加的消费者超过了分区的数量,那么一些消费者将处于空闲状态,根本得到不消息。
从kafka的topic中,我们对消费性能扩容的主要方式就是增加消费者组中的消费者数量。kafka的消费者通常会使用一些高延迟的操作,如写入数据库或者对数据进行耗时的计算。在这种情况下,单个消费者不可能跟上topic中消息产生的速度,我们通过让每个消费者只消费分区的消息子集以增加更多消费者来共享负载时我们扩展消费者性能的主要方法。这事创建topic时配置大量分区的一个很好的理由,它允许在负载增加的时候添加更多的消费者。需要注意的时,在要给topic中增加比分区更多的消费者是没用意义的-----有些消费者是空闲状态。在第二章中介绍了如何选择一个topic的分区数量。
除了通过添加消费者以扩展单个应用程序之外,多个应用程序从同一个主题读取数据的情况也很常见。事实上,kafka的主要设计目标之一是让kafka的topic中的数据在整个组织中让更多的应用程序来使用。在这些情况下,我们希望每个应用程序获得所有的消息,而不是topic中消息的子集。要确保应用程序获得topic中的所有消息,需要确保应用程序使用自己的消费者组。与许多传统的消息队列系统不同,kafka可以扩展到大量的消费者和消费者组而不会降低性能。
在前面的例子中,如果我们添加一个新的消费者组G2,其中只有要给消费者,这个消费者将获得topic T1中的所有消息,而与G1的行为无关。G2可以又多个消费者,在这种情况下,每个消费者将获得分区的子集,这也就与G1中的情况一致。但是不管其他的消费者组如何处理,G2做为一个消费者组会获得所有的消息。
总而言之,建议为每个需要来自要给或者多个topic的所有消息的应用程序创建一个新的组。你可以将消费者添加到现有的消费者组,以扩展对topic消息的读取和处理,消费者组中额外的各个消费者只能获得消息的子集。
正如我们在上一节中所看到的那样,消费者组中的消费者共享其订阅的topic中分区的所有权。当我们向消费者组中添加一个新消费者时,它开始使用之前由另外一个消费者使用的分区。当消费者关闭或者节点故障崩溃的时候,相同的事情也会发生,这个消费者离开消费者组,他之前使用的分区将被同组中其他消费者接管。将分区重新分配给消费者的情况也会发生在topic被修改的情况中,如增加新的分区。
将分区的所有权从要给消费者转移到另外一个消费者被称之为分区重平衡。重平衡很重要,因为他使生产者提供了高可用性和可伸缩性(允许我们轻松安全的增加和删除消费者),但是再正常的系统中,它非常不受欢迎。因为在重平衡的过程中,消费者不能消费消息,重平衡过程基本上是整个消费者不可用的短时间窗口。此外,当分区从一个消费者移动到另外一个消费者的时候,消费者会失去当前状态,如果它正在缓存任何数据,那么它将需要刷新它的缓存,导致应用程序变慢,直到消费者再次设置它的状态。在本章中,我们将讨论如何安全的处理平衡以及如何避免不必要的重平衡。
消费者维护消费者组中的成员关系和分配给他们的分区所有权的方法是将心跳发送到指定的broker的组协调器coordinator。(对于不同的消费者组,可能不在同一个broker上)。只要消费者定期发送心跳,就假定它的状态是活着的。并能处理来自分区的消息。当用户轮询和提交offset的时候会发送心跳。
如果消费者停止发送心跳或者心跳的时间过长,导致超时,那么消费者组的协调器coordinator将默认这个消费者已死,并触发再平衡。如果一个消费者崩溃或者宕机导致停止处理消息,那么组协调器coordinator将会在没用心跳的情况下等待几秒的超时时间之后来判定消费者已死亡并触发新的重平衡。在这段时间内,不会处理死亡消费者分区中的任何消息。当干净的关闭一个消费者的时候,消费者将通知组协调器coordinator它将要离开,组协调器coordinator将立即触发再平衡,减少处理中的差距。在本章的后面,我们将讨论控制心跳的频率和会话超时的配置选项,以及如何设置它们来满足业务的需求。
在版本0.10.1中,kafka社区引入了一个单独的心跳线程,它将在轮询期间发送心跳。这将允许你将心跳的频率(消费者组检测到一个消费者崩溃并不再发送心跳所需的时间)与轮询的的频率分开(由处理从broker返回数据所花费的时间决定)。在新版本的kafka中,你可以配置应用程序在离开组并触发重平衡之前可以不进行轮询。这个配置用livelock配置。在应用程序没用崩溃之前,由于某种原因无法继续,此配置与session.time
out.ms,是分开的,它控制检测消费者崩溃和停止发送心跳所需的时间。本章其余部分将讨论一些旧的行为和挑战,以及程序员应该如何处理。本章讨论了如何处理需要更长的时间处理记录的应用程序。运行apache kafka 0.10.1之后版本的用户不用关心。如果你使用的是新版本,并且要处理消费时间比较长的记录,那么只需要对max.poll.interval.ms进行优化,它将在处理轮询记录之间配置更长的延迟。
当消费者希望加入要给消费者组的时候,他会向消费者组协调器发送一个JoinGroup的请求,第一个加入这个组的消费者将成为这个组的领导者。领导者从组协调器接收组中所有成员的列表(这包括最佳发送心跳的所有消费者,因此被认为是活着的消费者)并负责为每个消费者分配子集。它使用PartitionAssignor的实现来决定哪个分区应该由哪个消费者处理。
kafka有两个内置的分配策略,我们将在配置部分更深入的讨论。在决定分区分配之后,消费者leader将分配列表发送给组协调器。后者将此信息发送给所有的消费者。每个消费者只能看到自己分配到的分区。leader是组中唯一一个拥有完整使用者列表及其分配的分区的客户端进程。每次发生重平衡之后,这个过程都会重复。
在开始使用kafka进行消费的第一步就是创建一个KafkaConsumer实例。创建KafkaConsumer非常类似于创建KafkaProducer,你创建一个java属性的实例,其中包括要传递给消费者的属性。我们将在后面的章节深入讨论所有的属性。首先,我们需要三个强制非空的基本属性:bootstrap.servers, key.deserializer, 和
value.deserializer。
第一个属性bootstrap.servers,是到kafka集群的连接字符串,它的使用方式与kafkaProducer完全相同。可以参考第三章。其他两个属性key.deserializer与value.deserializer类似于为生产者定义的序列化器,但不是将指定的java对象转换为字节数组的类,而是需要将指定的可接收的字节数组转换为java对象。
还有4个属性他不是严格强制的,但现在我们假定它存在。group.id它指定消费者属于哪个消费者组,虽然可能会创建不属于任何消费者组的消费者,但是者种情况并不常见,因此在本章的大部分时间里,我们假定消费者是一个组的一部分。
如下代码创建了KafkaConsumer:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
如果你读过关于生产者的第三章,你应该对记录中的大部分内容都很熟悉。我们假设使用字符串做为记录key和value的序列化方式。这里唯一新增的就是group.id ,这是消费者所属于的消费者组名称。
创建了消费者对象之后,下一步就是订阅一个或者多个topic,subcribe()方法接收topic的list做为参数,因此使用起来非常简单:
consumer.subscribe(Collections.singletonList("customerCountries"));
在此我们只创建了一个包含单个元素的topic名称的list,topic名称为customerCountries。
还可以使用正则表达式进行订阅,一个表达式可以匹配多个topic的名称,如果有人创建了一个具有匹配名称的topic,那么重平衡几乎会立即发生,消费者将开始从新的topic消费。这对于需要使用多个topic并可以处理topic包含不同类型数据的应用程序非常有用。使用正则表达式订阅多个topic的场景最常用在kafka和另外要给系统之间进行数据复制的应用程序上。
如果需要订阅所有的test主题,我们可以如下:
consumer.subscribe("test.*");
消费者API的核心是一个简单的循环,用于轮询服务器以获取数据。一旦用户订阅了topic,轮询的循环就会封装处理协调、分区重平衡,心跳和数据获取的所有细节,给开发人员留下一个干净简单的API,只会返回所取得分区中的数据。消费者的主要内容如下:
try { //这实际上是一个死循环,消费者通常是长时间运行的应用程序,不断轮询kafka以获取更多的数据,在本章的后续会对优雅的关闭消费者进行描述。 while (true) { //这是本章中最重要的一行,与鲨鱼必须游动才能避免死亡一样,消费者必须持续的从kafka拉取数据,否则消费者将被判定死亡,他们正在消费的分区会被重平衡给其他消费者消费。 //poll方法的参数是一个超时间隔,它控制在消费者缓冲区中没有可用数据时对poll的阻塞时间。 //如果设置为0,poll将立即返回;否则它将等待指定的毫秒数,等待broker的数据到达。 ConsumerRecords<String, String> records = consumer.poll(100); //poll返回的记录列表,每条记录都包含该记录的topic和分区,该分区记录的offset,还有该记录的key和value. //poll接收一个超时时间参数,这将指定轮询返回数据或者不返回数据时的等待时间,这个之通常由应用程序的响应需求决定。 for (ConsumerRecord<String, String> record : records) { log.debug("topic = %s, partition = %s, offset = %d, customer = % s, country =%s\n ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); //轮询处理通常以数据存储写入或者更新存储的记录来结束。 //这是为了保持每个程序消费者的数量,因此我们更新了一个hashtable并将结果打印为json。实际操作中一般是将结果存储与数据库。 System.out.println(json.toString(4)) } } } finally { //在退出之前要确保消费者关闭,这将关闭网络连接和socket。它还将立即触发消费者组进行重平衡。而不是等到消费者组通过心跳才知道消费者已下线,这样需要花费更多的时间。将导致某个分区的消费卡顿。 consumer.close(); }
轮询循环不仅仅是获取数据,第一次对使用者调用poll的时候,它负责查找GroupCoordinator,加入新的消费者并接收分区分配。如果重平衡被触发,它将在轮询循环中进行处理。当然,让消费者保持活动状态的心跳是从轮询循环中发送的。出于这个原因,我们试图确保在迭代之间所做的任何处理都是快速有效的。
你不能在一个线程中同时调用属于同一组的多个消费者,你也不能让多个线程安全的使用一个消费者。一个线程对应一个消费者。要在一个应用程序的同一个组中运行多个消费者,你需要给每个消费者分配一个线程来进行。将消费者逻辑包装在线程的对象中,然后通过java的ExeccutorService启动多个线程,每个线程都有一个消费者。Confluent的博客上有一篇教程。
到目前为止,我们主要学习了消费者的API,但是我们只研究了一些主要的配置参数,如bootstrap.servers, group.id,
key.deserializer, 和 value.deserializer。消费者的全部配置都记录在了ApacheKafka的官方文档中,大多数参数都有合理的默认值,不需要修改,但是有些参数会影响用户的性能和可用性,我们对重要的一些参数就行说明。
此属性运行将消费者指定获取记录时希望从broker接收的最小数据量。如果从broker接收的记录的字节少于fetch.min.bytes broker将等待,直到有更多的可用消息。然后将记录发送给消费者。这减少了消费者和broker的负载,因为在topic没用太多的新消息的情况下,他们必须处理更少的来回消息。如果消费者没有太多的可用数据而过多的使用系统CPU资源,你可以将这个值设置得高于默认值。在有大量消费者的情况下可以有效的降低broker的负载。
通过设置fetch.min.bytes 告诉kafka的等待时间,直到有足够的数据时才响应。fetch.max.wait.ms则是消费者可以控制的等待响应时间。默认情况下,kafka将等待高达500ms,如果没有自购的数据写入到topic以满足返回的最小数据量,这将导致高达500ms的延迟。如果需要降低这个延迟,你可以将fetch.max.wait.ms配置到一个比较低的数值,如果你将fetch.max.wait.ms设置为100而fetch.max.bytes为1M,那么kafka将无论有1MB数据返回或者在100ms之后,都会响应数据。
此属性控制服务器每个分区返回的最大字节数,默认时1MB,这意味着kafkaConsumer.poll()返回时,ConsumerRecords记录中,每个分区最多分配给消费者max.partition.fetch.bytes字节数。因此,如果一个topic有20个分区,而你有5个消费者,那么每个消费者需要有4MB每次供ConsumerRecords使用。事实上,你将会分配更多的内存,因为如果消费者组中其他消费者宕机,每个消费者将要处理更多的分区。max.partition.fetch.bytes的配置必须大于broker将要接收的最大消息。由broker的max.message.size确定。如果broker的最大消息大小过大导致消费者无法消费,那么将导致消费者挂起。设置max.partition.fetch.bytes的另外一个考虑是需要结合消费者处理数据的时间。在前面说过,消费者必须频繁的调用poll,以避免会话超时和重平衡。如果单个的poll返回的数据量非常大,那么消费者可能需要更长的时间来处理,这意味着不能及时发送下一个poll,导致会话超时。这个设置也会避免这种情况出现。如果出现这种情况,有两种操作,要么是降低max.partition.fetch.bytes,要么是增加会话超时的时间。
消费者在被认为存活的情况下,与broker通信的超时时间默认为3秒。如果超过这个时间消费者没有向GroupCoordinator发送心跳,那么这个消费者会被判定死亡,GroupCoordinator将触发消费者组的重平衡。以便将分区从死亡的消费者分配给组内其他的消费者。这种情况与心跳密切相关,KafkaConsumer poll()方法将控制心跳的频率,而session.timeout.ms将确定没有心跳的情况下broker将为消费者保留多久。因此这两个值通常应该一起协调修改,heartbeat.interval.ms必须小于session.timeout.ms,通常为其三分之一。如果session.timeout.ms是3秒,那么heartbeat.interval.ms就应该是1秒,设置session.timeout.ms如果低于默认值,则运行消费者组更快的检测故障并从故障中恢复。但也可能导致不必要的重平衡,因为消费者完成轮询循环或者垃圾收集的时间会更长。设置session.timeout.ms过高,虽然会减少意外带来的重平衡,但是也意味着需要更长的时间来检测真正的故障。
当消费者开始读取没有提交offset的分区或者已提交的offset无效时,此属性可以控制消费者的行为。(通常是消费者停机时间太长所持有的offset以及在broker中失效。) 这个配置的默认值是latest,这意味着如果缺少有效的offset,消费者将从最新的记录(消费者运行后写入的记录)开始读取。另外一种选择就是earliest,这意味着缺少有效的offset,消费者将从分区允许的最开始位置读取所有的数据。
我们讨论了提交偏移量的不同选项,此参数是控制消费者是否自动提交偏移量。默认值是true。如果希望在对提交偏移量进行控制,那么将这个值改为false。这个参数对于减少重复和避免数据丢失是非常必要的。如果将enable.auto.commit设置为true,那么你可能还需要对auto.commit.interval.ms进行配置来控制提交offset频率。
分区只能分配给消费者组中的一个消费者进行消费,PartitionAssignor是一个可以给定消费者分配哪些所订阅的topic的类。默认情况下,kafka有两种分配策略:
参数partition.assignment.strategy允许对分区策略进行配置。默认值是org.apache.kafka.clients.consumer.RangeAssignor实现了上述的描述的Range内容。你可以替换掉org.apache.kafka.clients.consumer.RoundRobinAssignor。自定义分区策略。在这种情况下partition.assignment.strategy的值应该是自定义的类名称。
可以是任意字符串,broker将使用它来标识消费者发送的消息,用于日志记录或者统计分析。
这控制了对poll方法一次调用将返回的最大记录数,这有助于轮询循环中需要处理的数据量。
这是socket连接在写入和读取数据时使用tcp发送和接收缓冲区的大小,如果设置为-1,将使用操作系统的默认配置。当生产者或者消费者与不同的数据中心进的broker进行通信时,增加这个参数值是个比较好的选择,因为此种情况下网络的延迟更高,带宽更低。
无论何时调用poll,它都会返回写入kafka的记录,而我们的组内其他消费者没有读取这些记录。这意味着我们有一种方法乐意跟踪组的消费者分别读取了哪些记录。如前面所示,kafka的独特特性之一是它不像许多JMS队列那样来跟踪来自消费者的消息确认。相反,它允许消费者使用kafka来跟踪他们在每个分区中的位置(offset)。
我们将更新分区中当前位置的操作称为commit。
消费者如何提交offset?它通过一个特殊的topic __consumer_offsets 为每个分区保存提交的offset。只要你所有的用户都在运行,你会感知不到它的存在。然而,如果一个消费者死亡,或者一个新的消费者加入该消费者组,这将触发重平衡。在重平衡操作之后,每个消费者都可能会分配了一组新的分区,而不是之前处理的哪个分区。为了知道从哪开始工作,消费者读取每个分区的最新提交的offset,之后从哪个位置继续读取消息。
如果提交的offset小于消费者处理的最后一条消息的offset,那么最后处理的offset和提交的offset之间的消息将被消费两次。
如下图:
如果提交的offset大于客户端消费者实际处理的最后一条消息的offset,那么消费者组将错过最后处理的offset和提交的offset之间的所有消息。
显然,管理offset对客户端应用程序有很大的影响。
KafkaConsumer API提供了多种方式来进行commit:
提交commit最简单的办法就是允许消费者自动为你做这件事,如果配置enable.auto.commit=true,每隔5秒,消费者将提交客户端从poll中接收到的最大offset。5秒间隔是通过auto.commit.interval.ms设置。就像消费者中其他一些事情一样,自动提交由轮询循环驱动,每次轮询的时候,消费者会检查是否应该提交offset。如果是,它将提交上次轮询中返回的offset。
然而,在使用这种方法之前,需要了解其可能带来的后果。
假定默认情况下,自动提交5秒发生一次,如果我们最近一次commit之后3秒,要给reblance操作被触发,重平衡操作之后,所有的消费者都将从最后一个已提交的offset开始消费,在本例中,offset是三秒之前的,因此在这三秒内到达的所有消息将被重复处理两次。可以将提交的时间间隔减少,更加频繁的提交并减少记录重复的时间窗口,但是不可能完全消除。
启动自动提交之后,对轮询的调用将始终提交上次轮询返回的最后的偏移量。它不知道实际处理了哪些消息,因此在再次调用poll之前保证将上一次poll的消息完全处理是至关重要的。
如poll方法一样,close方法也会自动提交offset,这通常不是问题,但是在处理异常或者提前退出轮询循环的时候要注意,自动提交很方便打算他们没有给开发任意足够的控制权来避免消息重复消费问题。
大多数开发人员对提交offset的时间进行了更多的配置,既可以消除丢失数据的可能性,又可以减少对重平衡带来重复消费的数据量。消费者API可以由开发人员选择有意义的点提交offset,是不是基于计时器。
设置auto.commit.offset=false,只有当应用程序选择提交offset的时候,才会提交offset。最简单的提交api是commitSync().这个API将poll返回的最新偏移量,并在偏移量提交后返回,如果由于某种原因提交失败,则抛出异常。务必记住,commitSync()将提交poll()返回最新的offset。因此请确保在处理完集合中所有记录之后调用commitSync().否则可能丢失消息。当触发reblance时,从最近一批开始到reblance的时候所有消息被处理了两次。下面是我们在处理完最新一批消息后如何使用commitSync提交offset。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } }
假定通过打印一条记录内容,我们完成对消息的处理,你的应用程序可能会做更多的事情,如修改、聚合等,或者通知用户完成重要的事件。你应该根据你的用例决定什么时候记录是完成的。
完成当前批次处理中的所有记录的处理之后,在轮询其他消息之前,调用commitSync提交批处理中的最后一个offset。
只要没有无法恢复的错误,commitSync就会尝试重试提交。如果发生了无法恢复的错误,我们除了记录错误之外没有更好的办法。
手动提交的一个缺点就是应用程序被阻塞,直到broker响应了提交请求。这将限制应用程序的吞吐量。通过降低投入频率可以提高吞吐量,但是这样我们增加了重平衡之后潜在的重复消费的数量。
另外一个选项是通过异步API提交,我们无需等待broker响应提交,只需发送请求就可以继续:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
//提交最新的offset
}
缺点是,虽然commitSync会重复提交直到成功或者遇到不可控的失败,但是commitAsync方法不会重试。这个原因是当commitSync收到来自服务端的响应时,可能以及有一个后续已提交成功的commit,所以不会重试。假定我们commit offset为2000时发送了一个请求,存在通信问题,因此broker永远不会得到请求,因此永远不会响应,我们又成功处理了一批,成功提交了offset 3000。如果commitAsync现在重试之前的失败提交,它可能会成功提交2000的offset在3000已经被成功处理之后,在reblance的情况下,这将导致更多的重复消费。
我们提到了提交顺序的重要性和复杂性。commitAsync还提高了一个选项来传入一个回调函数callback,该callback将在broker响应的时候触发。通常使用回调来记录提交错误或者在一个统计度量中的计数。但是如果你想通过callback来实现重试,你需要注意提交顺序问题:
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); //我们发送commit并继续执行,但是如果commit提交失败,则将对失败和offset进行记录。 }
为异步重试获取提交顺序的一个简单的方式是使用一个单调连续递增的序列号。每次提交时增加序列号,并在提交到commitAsync回调时添加序列号。当准备发送重试时,检查回调得到提交的序列号是否等于实例变量。如果时,则没有更新的提交,可以安全的重试,如果实例序列号较高,则不需要重试,因为已经有新的提交了。
通常在没有重试的情况下偶尔提交失败并不是一个大问题,因为如果问题是临时性的,下面的提交肯定会成功。但是,如果我们知道这是关闭消费者之前或者reblance之前的最后一次提交,我们特别希望确保提交成功。
因此,一种常见的模式就是在关闭之前将commitAsyncy与commitSync结合使用。如下是其工作原理:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } //选择了更快的异步提交方式,如果一个提交失败,下个提交将重试 consumer.commitAsync(); } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { //一旦我们选择关闭,则没有下一次了,我们将调用commitSync直到成功为止 consumer.commitSync(); } finally { consumer.close(); } }
提交最新的offset只允许按照你已经完成的批次的频率去提交,但是如果你想更频繁的提交呢?如果poll返回了一个巨大的批次,而你希望在处理的中间阶段提交offset,以避免reblance发生的时候对这些消息重复消费,该怎么办?不能只调用commitSync或commitAsync,这将提交这个批次中最后一条记录的offset.有部分消息你还没进行处理。
幸运的是,消费者API允许你在commitSync和commitAsync提交的时候传递指定的分区和offset。如果正在处理一批记录,并且topic “customers”中从分区3得到的最后一条消息的offset为5000,那么可以调用commitSync来提交offset 5000用于topic "customers"中的分区3.由于你的消费者可能使用了多个分区,你将需要记录所有分区上的offset,这将增加代码的复杂性。
如下是指定offset提交的示例:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); //这个map将记录所有分区的offset int count = 0; .... while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); //print是处理操作的替代方法 currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); //在读取每条记录之后,我们使用希望处理的下一条消息的offset更新offset的map,这是我们下次开始的地方。 if (count % 1000 == 0) //我们决定1000条记录提交当前的offset,在应用程序中,可以根据记录时间和内容进行提交。 consumer.commitAsync(currentOffsets, null); //我们选择异步提交,同步和异步都支持带参数的方法。之后你需要添加前面章节的特殊处理方式 count++; } }
正如我们前文描述的提交offset的内容,消费者希望在程序退出和reblance发生之前做出一些清理工作。
如果你指定你的消费者即将失去一个分区的所有权,你将希望提交你最后处理的消息的offset。如果你的消费者程序维护了一个缓冲区,其中的事件它只是偶尔处理,如我们在使用pause功能的时候使用了currentRecords map。你会希望在失去分区所有权之前处理你累计的这些事件,包括你还需要关闭的文件句柄、数据库连接等。
消费者API允许你从消费者中添加或者删除分区的时候运行自己的代码,可以通过在调用我们之前讨论的subscribe方法时传入一个ConsumerReblanceListener来实现。ConsumerReblanceListener有两个方法你可以实现:
这个示例将展示如何使用onPartitionsRevoked()在失去分区所有权之前提交offset。在下一节中我们将展示一个更复杂的示例,它也将对onPartitionsAssigned()进行演示:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); //开始实现ConsumerRebalanceListener. private class HandleRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //得到一个新的分区的时候不需要做任何处理 } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); //当我们由于rebalance而丢失一个分区的时候,我们需要提交offset,注意提交的是当前处理的offset而不是批次的offset.我们使用commitSync确保offset被提交 consumer.commitSync(currentOffsets); } } try { //将ConsumerRebalanceListener传递给subscribe方法是 最重要的部分 consumer.subscribe(topics, new HandleRebalance()); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); } consumer.commitAsync(currentOffsets, null); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); System.out.println("Closed consumer and we are done"); } }
到目前为止,我们已经了解如何使用poll开始使用每个分区中最近提交的offset的消息,并且有序处理所有消息。但是在某些时候,你需要从指定的offset开始读取。
如果你想从开始时读取整个分区,或者你想跳过所有的分区的旧消息只消费新写入的消息,有一个专门的API。seekToBeginning(TopicPartition tp)
和 seekToEnd(TopicPartition tp).
但是kafka API也允许你指定offset读取,在某些方面非常实用。例如:返回一些消息或者跳过一些消息。可能一个对时间敏感的程序想跳过更相关的消息,这个功能最令人兴奋的是可以将offset存储在kafka之外的系统。
在这个场景中,你的应用程序正在读取来自kafka的消息,并处理数据,然后将结果存储在数据库、nosql或者hadoop中,假定我们并不清楚。假如我们不想丢失任何数据,也不想在数据库中存储两次相同的结果。那么消费者可以像如下这样:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()),
record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
在这个例子中,我们非常固执的在处理每条消息之后提交offset,但是在记录存储到数据库之后但是offset还没提交之前,程序任然可能崩溃,从而导致再次处理重复的数据,导致数据库记录重复。
如果有一种方法可以在一个原子操作中同时存储和提交offset,那么这种情况是可以避免的,要么记录和offset同时提交成功,要么都失败。只要数据被写入数据库,kafka就一定能成功提交offset。事实上这是不可能的。
但是如果我们在同一个事务中同时将offset和消息写入数据库会怎么样呢 ?然后我们指定我们完成了记录的处理,并提交了offset,要么没有提交成功,将重新处理。
现在唯一的问题是,如果记录存在在数据库而不是kafka,那么当它被分配一个分区的时候,我们的消费者如何知道从哪开始读取?这正是seek()方法的用途。当消费者启动或者分配了新的分区的时候,它可以在数据库中查找offset并指定该位置通过seek消费。
示例如下:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //这里假定向数据库提交事务 包括消息记录和offset commitDBTransaction(); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) //从数据库获取offset并通过seek方法指定offset读取 consumer.seek(partition, getOffsetFromDB(partition)); } } consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); consumer.poll(0); //当消费者启动的时候,在我们订阅主题之后,我们将调用poll()一次,以确保我们加入了一个消费者组并获得分区,然后我们立即通过seek当前分区的正确offset。需要注意的是seek只更新我们使用的offset,下一个poll将获取正确的消息,如果seek中出错,比如offset不存在,那么poll将出异常。 for (TopicPartition partition: consumer.assignment()) consumer.seek(partition, getOffsetFromDB(partition)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { processRecord(record); storeRecordInDB(record); storeOffsetInDB(record.topic(), record.partition(), record.offset()); //维护一个在数据库中存储的offset列表,假定记录更新很快,所以我们对每个记录都进行更新的话,会导致提交变慢,因此我们只在批处理最后进行提交,这可以通过不同的方式优化。 } commitDBTransaction(); }
通过在外部存储offset和数据,有许多方法可以实现exactly-once语义。但是所有的这些方法都需要使用ConsumerRebalanceListener和seek方法,以确保offset及时存储,并确保消费者从正确的位置开始读取。
在本章之前我们讨论了轮询循环时,我们说过你不需要担心消费者在轮询循环的死循环中,我们将讨论如何优雅的退出循环。所以如下将进行讨论。
当你决定退出轮询循环时,你将需要另外一个线程老调用consumer.weakup()。如果在主线程中运行消费者的轮询循环,则可以通过shutdownHook完成。需要注意的是consumer.wakeup()是从其他线程调用中唯一安全的方法。调用weakup将导致poll抛出WakeupException退出,或者使用在线程没有等待轮询时调用了wakeup方法,则在下次调用poll的时候抛出异常。WakeupException不需要处理,但是在退出线程之前,必须调用consumer.close方法。如果需要,关闭消费者将提交offset,并向group Coordinator发送消费者离开组的消息。group Coordinator将即时触发reblance,你不需要再等待会话超时就可以将要关闭消费者的分区分配给组中另外一个消费者。
在主线程中退出的代码如下,这个示例代码并不完整,你可以在http://bit.ly/2u47e9A查看完整示例。
Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Starting exit..."); consumer.wakeup(); try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //shutdown在一个独立的线程中运行,我们采用唯一安全的方法 wakeup。 try { // looping until ctrl-c, the shutdown hook will cleanup on exit while (true) { ConsumerRecords<String, String> records = movingAvg.consumer.poll(1000); System.out.println(System.currentTimeMillis() + " -- waiting for data..."); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); movingAvg.consumer.commitSync(); } } catch (WakeupException e) { //调用weakup将导致WakeupException异常,确保应用程序不会额外退出,但是不需要做其他处理。 // ignore for shutdown } finally { consumer.close(); System.out.println("Closed consumer and we are done"); //确保消费者被close } }
正如前一章所说,kafka生产者需要将通过序列化器将对象转换为字节数组,然后再发送给kafka。类似的,kafka消费者需要通过反序列化器从kafka中将接收到的字节数组转换为java对象。在前面的示例中,我们假设每个消息的key和value都是字符串,并且在消费者配置中使用默认的StringDeserializer。
在关于kafka生产者的第三章中,我们看到了如何使用序列化自定义类型,以及如何使用avro和avroSerializer从模式定义中生成Avro对象,然后在为kafka生成消息时使用他们进行序列化。现在我们来看一些如何使用自己的对象创建自定义反序列化器以及如何使用Avro及其反序列化器。
很明显,用于kafka生产者的序列化器必须与用于消费者消费的反序列化器匹配,用IntSerializer序列化器然后用StringDeserializer反序列化器不会产生正确的结果,这一文之做为开发人员,你需要对每个topic使用了哪些序列化器进行了解,并确保每个topic只包含你使用的反序列化器能够解析该数据。这是使用avro和模式存储进行序列化和反序列化的好处。AvroSerializer可以确保写入特定topic的所有数据都与模式兼容,这意味着可以使用匹配的反序列化器和模式对其进行反序列化。任何兼容性方面的错误,在生产者或者消费者方面都可以用适当的错误消息轻松的进行捕获,这意味着你不需要尝试调试字节数组来处理序列化器的错误。
我们将快速展示如何编写自定义的反序列化器开始,尽管这是一种不常用的方法,然后我们将使用avro来进行反序列化。
以第三章中的序列化器示例,如下写一个反序列化器。
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
自定义的反序列化器如下:
import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerDeserializer implements Deserializer<Customer> { @Override public void configure(Map configs, boolean isKey) { // nothing to configure } @Override public Customer deserialize(String topic, byte[] data) { int id; int nameSize; String name; try { if (data == null) return null; if (data.length < 8) throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected"); ByteBuffer buffer = ByteBuffer.wrap(data); id = buffer.getInt(); String nameSize = buffer.getInt(); byte[] nameBytes = new Array[Byte](nameSize); buffer.get(nameBytes); name = new String(nameBytes, 'UTF-8'); return new Customer(id, name); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { // nothing to close } }
消费者端也需要实现Customer类,类和序列化器都需要在生产者和消费者的应用程序上匹配。在又许多消费者和生产者共享数据访问权限的大型组织中,这可能会非常有挑战性。
反序列化器只是颠倒了了序列化器的逻辑,从字节数字中将Customer对象的ID和名称获取出来,重新构造一个对象。
消费者代码如下:
Properties props = new Properties(); props.put("bootstrap.servers","broker1:9092,broker2:9092"); props.put("group.id","CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer"); KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props); consumer.subscribe("customerCountries") while(true) { ConsumerRecords<String, Customer> records = consumer.poll(100); for (ConsumerRecord<String, Customer> record : records) { System.out.println("current customer Id: " + record.value().getId() + " and current customer name:" + record.value().getName()); } }
同意需要说明的是,不建议实现自定义的反序列化器,它需要和生产者、消费者紧密关联,而且非常脆弱。容易出错,最好的解决办法是用标准的消息格式。入JSON、Thrift、Protobuf、或者Avro.如下将介绍如何使用Avro实现反序列化器操作。有关apache avro的背景知识、模式和模式兼容等功能,请参考第三章。
以第三章所列举的avro和其实现的Customer对象为例,为了消费这些消息,我们需要实现一个类似的反序列化器。
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.serializer", "org.apache.kafaka.common.serialization.StringDeserializer"); //使用KafkaAvroDeserializer来反序列化消息 props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); //schema.registry.url是一个新参数,指向模式存储的位置,这样可以使用生产者注册的模式来反序列化消息。 props.put("schema.registry.url", schemaUrl); String topic = "customerContacts" KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url)); consumer.subscribe(Collections.singletonList(topic)); System.out.println("Reading topic:" + topic); while (true) { ConsumerRecords<String, Customer> records = consumer.poll(1000); for (ConsumerRecord<String, Customer> record: records) { System.out.println("Current customer name is: " + record.value().getName()); } consumer.commitSync(); }
到目前为止,我们已经讨论了消费者组,在消费者组中分区被自动分配给消费者,并在消费者被添加或者从消费者组中删除的时候reblance。在通常情况下,这些行为都是你所期望的,但是,有时候你希望这些操作更简单。有时,你指定有一个消费者总是从topic中的所有分区或特定分区读取数据,在这种情况下,没有必要进行reblance或者分组。只需要分配特定的消费者的topic或者特定的分区,消费者偶尔提交offset即可。
当你确切的知道你应该通过消费者去消费哪些分区的时候,你就不必要订阅某个topic,而是为自己分配几个分区。消费者既可以订阅topic,也可以自己分配分区。但不能能使具备这两个功能。
下面的示例,说明消费者如何为自己分配特定的topic分区,并使用他们:
List<PartitionInfo> partitionInfos = null; partitionInfos = consumer.partitionsFor("topic"); //我们首先向集群中查询topic可以使用的分区,如果你计划只订阅其中的部分分区,那么你可以跳过这一步。 if (partitionInfos != null) { for (PartitionInfo partition : partitionInfos) partitions.add(new TopicPartition(partition.topic(), partition.partition())); consumer.assign(partitions); //一旦我们知道了我们想要的分区,我们就用assign()传入分区list。 while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record: records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } }
除了缺乏reblance和需要手动查询分区之外,其他一切都与之前的方法相同。如果有人向topic添加了新分区,这种情况下不会通知消费者。你需要通过定期检查consumer.partitionsFor()来处理这个问题。或者只要在添加的分区时调用即可。
在本章中,我们讨论了java KafkaConsumer的客户端,踏实org.apache.kafka客户端jar的一部分。在编写本文时,Apache Kafka仍然有两个用scala编写的老的客户端,他们也是kafka Consumer包核心模块的一部分。
这些消费者被称为SimpleConsumer,SimpleConsumer是kafka Consumer API的一个封装,允许从特定的分区读取消息。另外一个被称为高级API的ZookeeperConsumerConnector。高级消费者和当前的消费者比较类似,因为它有消费者组并且支持reblance。但是它使用zookeeper来管理消费者组,并没有像本文之前描述的那样对commit和reblance进行相同的控制。
因为当前的消费者支持这两种情况,并为开发人员提供了更多的可靠性和控制,所以我们将不再讨论这些旧的API。如果你对他们感兴趣。请慎重选择。可以在Apache Kakfa官方文档中了解更多的消息。
在本章开始的时候,我们深入解释了kafka的消费者组,以及他们如何允许多个消费者共享从topic中读取消息的工作。在理论说明之后,我们用了一个实际的例子来说明消费者订阅一个topic并持续读取消息,然后我们研究了最重要的用户配置以及他们是如何影响用户行为的。我们在本章中花费了很大一部分实际来讨论补偿机制以及消费者如何跟踪和补偿。在编写可靠消费者时,理解消费者如何提交补偿式至关重要的。因此我们花费了一些时间来解释实现这一点的不同方法。然我们讨论了消费者API的其他不,处理reblance和优雅关闭消费者。
最后我们讨论了消费者用来存储在kafka中的字节数组如何转换为java对象的反序列化器。我们详细讨论了avro反序列化器,尽管他们知识你可以使用的反序列化器之一,因为他们是最常用的。
现在你已经知道如何使用kafka生产和消费事件消息。下一章我们将讨论kafka的内部实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。