赞
踩
本篇我们将从 Kafka 生产者的设计和组件讲起,学习如何使用 Kafka 生产者。
将演示如何创建 KafkaProducer 和 ProducerRecords 对象、如何将记录发送给 Kafka,以及如何处理Kafka 返回的错误,然后介绍用于控制生产者行为的重要配置选项,最后深入探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器。
很多情况下我们需要往 Kafka 写入消息,然而不同的场景对写入消息的要求也不一样,比如:是否允许消息丢失?是否允许重复消息?是否有严格的延迟和吞吐量要求?
不同的场景对上述要求往往都是不一样的。
因此,不同的使用场景对生产者 API 的使用和配置会有直接的影响。尽管生产者 API 使用起来很简单,但消息的发送过程还是有点复杂的。
Kafka发送消息的主要步骤如下图所示:
向Kafka 写入消息的第一步是要创建一个生产者对象,并设置一些属性。
Kafka 生产者有 3个必选的属性。
该属性指定 broker 的地址清单,地址的格式为 host:port。
清单里不需要包含所有的broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
broker 希望接收到的消息的键和值都是字节数组。
因此生产者需要知道如何把这些 Java 对象转换成字节数组。
key.serializer 必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer(这个只做很少的事情)、StringSerializer 和 IntegerSerializer,因此,如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。
注意,key.serializer 是必须设置的,就算你打算只发送值内容。
与 key.serializer 一样,value.serializer 指定的类会将值序列化。
如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。
下面用实际代码演示如何创建一个新的生产者(只指定了必要的属性,其他使用默认设置):
private Properties kafkaProps = new Properties(); // 1
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); // 2
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps); // 3
这个接口很简单,通过配置生产者的不同属性就可以很大程度地控制它的行为。我们将在后面部分介绍其中几个比较重要的参数。
实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下 3 种方式:
把消息发送给服务器,但并不关心它是否正常到达。
大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。
我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
在下面的几个例子中,我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。
最简单的消息发送方式如下所示:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); // ➊
try {
producer.send(record); // ➋
} catch (Exception e) {
e.printStackTrace(); // ➌
}
这里的构造函数需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。
send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。
这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
最简单的同步发送消息方式如下所示:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get(); ➊
} catch (Exception e) {
e.printStackTrace(); ➋
}
KafkaProducer 一般会发生两类错误:
比如对于连接错误,可以通过再次建立连接来解决,“
无主(no leader)”错误则可以通过重新为分区选举首领来解决。
KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
由于消息发送需要时间,获取响应需要等待。但大多数时候,我们并不需要等待响应——尽管 Kafka
会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。
在遇到消息发送失败时,我们可以抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。
下面是使用回调的一个例子:
private class DemoProducerCallback implements Callback { // ➊
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); // ➋
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); // ➌
producer.send(record, new DemoProducerCallback()); // ➍
注意:
这些回调在生产者的主线程中执行。这保证了当我们向同一个分区相继发送两条消息时,它们的回调将按照我们发送它们的相同顺序执行。
但这也意味着回调应该相当快,以避免延迟生产者并阻止发送其他消息。不建议在回调中执行阻塞操作。
相反,应该使用另一个线程来并发地执行任何阻塞操作。
上文只介绍了生产者的几个必要配置参数——bootstrap.servers API 以及序列化器。
生产者还有很多可配置的参数,在 Kafka 文档里都有说明,一般情况没必要去修改它们。
不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明:
client.id是客户端和它所使用的应用程序的一个逻辑标识符。
它可以是任何字符串,Broker将使用它来识别从客户端发送的消息。
它用于日志记录和metrics以及quotas,选择一个好的客户端名称将使故障排除更加容易。
这就是“我们发现来自 IP 104.27.155.134 的身份验证失败率很高”和“订单验证服务似乎无法通过身份验证——你能请 Laura 看一下吗?”之间的区别。
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
这个参数对消息丢失的可能性有重要影响。该参数有如下选项:
也就是说, 生产者无法知道服务器有没有收到消息,即无法知道消息是否丢失。
不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大 速度发送消息,从而达到很高的吞吐量。
如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。
它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。
不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
虽然,在较低和较不可靠的acks配置下,生产者将能够更快地发送记录。因为你用可靠性换取了生产者的延迟。然而,端到端的延迟是指从记录产生到消费者可以读取的时间,对于这三个选项来说是相同的。
原因是,为了保持一致性,Kafka直到它们被写入所有同步的副本,才允许消费者读取记录。
因此,如果你关心的是端到端的延迟,而不仅仅是生产者的延迟,就不需要做任何权衡:即使你选择最可靠的选项,你会得到相同的端到端延迟。
生产者有多个配置参数,它们相互作用以控制开发者最感兴趣的下列行为:从调用send()到返回成功或失败需要多长时间。这个时间,是我们愿意等待直到Kafka成功响应,或者直到我们愿意放弃并认为调用失败的时间。
多年来,这些配置和它们的行为被多次修改。
从Apache Kafka 2.1开始,我们将发送ProduceRecord的时间分为两个时间间隔,分别进行处理:
这与将 Produce Record 放入批处理中进行发送开始,直到Kafka回应成功、不可逆转的失败,或者我们分配的发送时间用完为止的时间是相同的。
注意:
如果你同步使用send(),发送线程将在两个时间间隔内连续阻塞,将无法知道每个时间间隔内分别花了多少时间。
所我们将推荐和讨论使用带有回调的异步send()。
生产者内部的数据流以及不同的配置参数如何相互影响,可以概括为下图:
我们将通过不同的配置参数来控制这两个区间的等待时间,以及它们如何相互作用。
这个参数控制生产者在调用send()和通过partitionsFor()显式请求元数据时可以阻塞多长时间。
当生产者的发送缓冲区已满或元数据不可用时,这些方法可能会阻塞。
当达阻塞时间到max.block.ms时,会抛出一个超时异常。
此配置将限制从记录准备好发送(send() 成功返回并且记录被放入批处理)到Broker响应或客户端放弃所花费的时间,包括重试所花费的时间。
如上图所示,这个时间应该大于linger.ms和request.timeout.ms。如果你尝试创建一个超时配置不一致的生产者,你会得到一个异常(这句话没太理解)。消息通常会比 delivery.timeout.ms 更快地成功发送。
如果生产者在重试时超过了 delivery.timeout.ms,回调将被调用,其异常与Broker在重试前返回的错误相对应。
如果在记录批处理仍在等待发送时超过delivery.timeout.ms,则回调将被调用并出现超时异常。
你可以将发送超时配置为你希望等待信息发送的最长时间,通常是几分钟,然后保留默认的重试次数(实际上是无限的)。
在这种配置下,只要没超时iu继续尝试,生产者就会不断重试(或直到成功),这是一种更合理的重试的方式。
该参数控制生产者在发送数据时等待服务器回复的时间。
注意,这是在放弃之前等待每个生产者请求所花费的时间; 它不包括重试、发送前花费的时间等.
如果达到超时而没有回复,生产者将重试发送,或以一个TimeoutException完成回调。
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,
不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。
一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。
你只需要处理那些不可重试的错误或重试次数超出上限的情况。
linger.ms 控制在发送当前批次前等待更多消息的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。
把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)
buffer.memory用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候,额外的 send() 调用将阻塞 max.block.ms的时间, 并在抛出异常之前等待空间释放。
默认情况下,消息发送时不会被压缩。
该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。
该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。
当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满
的批次,甚至只包含一个消息的批次也有可能被发送。
所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。
但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。
它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
Kafka会保留分区内消息的顺序。
这意味着如果消息以特定顺序从生产者发送,Broker将按该顺序将它们写入分区,所有消费者将按该顺序读取它们。
该参数用于控制生产者发送的请求大小。
它同时限制了发送的单个消息的最大值,和单个请求里所有消息总的大小。
例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。
另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
从 0.11 版本开始,Kafka 支持 exactly once 语义。
Exactly once 是一个相当大的话题,后面专门讨论它,但idempotent producer(幂等生产者)是其中比较简单且非常有用的部分。
假设你配置你的生产者以最大化可靠性:acks=all和一个适当大的delivery.timeout.ms以允许足够的重试。这些配置确保每条消息至少写入一次 Kafka。
例如,假设一个 broker 从生产者那里收到一条记录,将其写入本地磁盘,并且记录已成功复制到其他brokers,但随后第一个broker在向生产者发送响应之前崩溃了。生产者将等待,直到到达request. timeout.ms,然后重试。重试将转到新的leader,该leader已经拥有该记录的副本,因为之前的写被成功复制了,所以记录就会重复。
为避免这种情况,您可以设置 enable.idempotence=true。当闲置的idempotent producer被启用时,生产者将为其发送的每条记录附上一个序列号。
如果broker收到具有相同序列号的记录,它将拒绝第二个副本,生产者将收到无害的DuplicateSequenceException。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。