赞
踩
Kafka的精确一次性语义与国际象棋正好相反:要理解它不容易,但用起来很简单。
本文将介绍 Kafka 实现精确一次性语义的两个关键机制:幂等生产者(避免由重试机制导致的重复处理)和事务(Streams精确一次性语义的基础)。
通过一个配置就可以启用它们,这样就可以很方便地在要求更少重复和更高正确性的应用程序中使用Kafka了。
Kafka的幂等生产者可以自动检测并解决消息重复问题。
唯一标识
max.inflight.requests
设置成5或更小的值(默认值是5)。错误指标记录
record-error-rate
指标当中。在broker端,错误是ErrorsPerSec
指标的一部分(RequestMetrics类型)。乱序错误
虽然生产者在遇到“乱序”异常后将继续正常运行,但这个错误通常说明生产者和broker之间出现了消息丢失——如果broker在收到消息2之后直接收到消息27,那么说明从消息3到消息26一定发生了什么。如果在日志中看到这样的错误,那么最好重新检查一下生产者和主题的配置,确保为生产者配置了高可靠性参数,并检查是否发生了不彻底的首领选举。
一般发生下边两种情况会导致幂等性被破坏:生产者重启和broker故障。
生产者重启
broker故障
幂等生产者只能防止由生产者内部重试逻辑引起的消息重复。对于使用同一条消息调用两次producer.send()
就会导致消息重复的情况,即使使用幂等生产者也无法避免。
这是因为生产者无法知道这两条消息实际上是一样的。通常建议使用生产者内置的重试机制,而不是在应用程序中捕获异常并自行进行重试。使用幂等生产者是在进行重试时避免消息重复的最简单的方法。
幂等生产者只能防止因生产者自身的重试机制而导致的消息重复,不管这种重试是由生产者、网络还是broker错误所导致。
幂等生产者使用起来非常简单,只需在生产者配置中加入enable.idempotence=true
。如果生产者已经配置了acks=all,那么在性能上就不会有任何差异。在启用了幂等生产者之后,会发生下面这些变化:
max.in.flight.requests.per.connection
被设置为大于1的值(5是默认值,这也是幂等生产者可以支持的最大值)。Kafka的事务机制是专门为流式处理应用程序而添加的。因此,它非常适用于流式处理应用程序的基础模式,即“消费–处理–生产”。事务可以保证流式处理的精确一次性语义——在更新完应用程序内部状态并将结果成功写入输出主题之后,对每个输入消息的处理就算完成了。
一些流式处理应用程序对准确性要求较高,特别是如果处理过程包含了聚合或连接操作,那么事务对它们来说就会非常有用。
如果流式处理应用程序只进行简单的转换和过滤,那么就不需要更新内部状态,即使出现了重复消息,也可以很容易地将它们过滤掉。
但是,如果流式处理应用程序对几条消息进行了聚合,一些输入消息被统计了不止一次,那么就很难知道结果是不是错误的。如果不重新处理输入消息,则不可能修正结果。
金融行业的应用程序就是典型的复杂流式处理的例子,在这些应用程序中,精确一次性被用于保证精确的聚合结果。不过,因为可以非常容易地在Streams应用程序中启用精确一次性保证,所以已经有非常多的应用场景(如聊天机器人)启用了这个特性。
应用程序崩溃导致的重复处理
消费者处理消息有两个必须的步骤:
一:处理消费的消息,并且将消息写入目标主题。
二:提交消费消息偏移量
假设在消费处理完消息后,提交偏移量前,消费者应用程序奔溃,将会导致重复消费的问题。
“僵尸”应用程序导致的重复处理
继续以流式处理应用程序为例。它会从一个主题读取数据,对数据进行处理,再将结果写入另一个主题。精确一次处理意味着消费、处理和生产都是原子操作,要么提交偏移量和生成结果这两个操作都成功,要么都不成功。我们要确保不会出现只有部分操作执行成功的情况(提交了偏移量但没有生成结果,反之亦然)。
为了支持这种行为,Kafka事务引入了原子多分区写入的概念。我们知道,提交偏移量和生成结果都涉及向分区写入数据,结果会被写入输出主题,偏移量会被写入consumer_offsets主题。如果可以打开一个事务,向这两个主题写入消息,如果两个写入操作都成功就提交事务,如果不成功就中止,并进行重试,那么就会实现我们所追求的精确一次性语义。
开启生产者事物配置参数
下图是一个简单的流式处理应用程序,它会在执行原子多分区写入的同时提交消息偏移量:
transactional.id
并用initTransactions()
方法初始化的Kafka生产者。producer.id
(由broker自动生成)不同,transactional.id
是一个生产者配置参数,在生产者重启之后仍然存在。transactional.id
主要用于在重启之后识别同一个生产者。broker维护了transactional.id
和producer.id
之间的映射关系,如果对一个已有的transactional.id
再次调用initTransactions()
方法,则生产者将分配到与之前一样的producer.id
,而不是一个新的随机数。隔离僵尸程序
initTransaction()
方法初始化事务性生产者时,Kafka会增加与transactional.id
相关的epoch
。带有相同transactional.id
但epoch
较小的发送请求、提交请求和中止请求将被拒绝,并返回FencedProducer
错误。旧生产者将无法写入输出流,并被强制close()
,以防止“僵尸”引入重复记录。在很大程度上,事务是一个生产者特性。创建事务性生产者、开始事务、将记录写入多个分区、生成偏移量并提交或中止事务,这些都是由生产者完成的。然而,这些还不够。以事务方式写入的记录,即使是最终被中止的部分,也会像其他记录一样被写入分区。消费者也需要配置正确的隔离级别,否则将无法获得我们想要的精确一次性保证。
消费者开启事物参数配置
isolation.level
参数来控制消费者如何读取以事务方式写入的消息。read_committed
,那么调用consumer.poll()
将返回属于已成功提交的事务或以非事务方式写入的消息,它不会返回属于已中止或执行中的事务的消息。read_uncommitted
,它将返回所有记录,包括属于执行中或已中止的事务的记录。read_committed
并不能保证应用程序可以读取到特定事务的所有消息。也可以只订阅属于某个事务的部分主题,这样就可以只读取部分消息。此外,应用程序无法知道事务何时开始或结束,或者哪些消息是哪个事务的一部分。transaction.timeout.ms
参数指定,默认为15分钟)并被broker终止。长时间使事务处于打开状态会导致消费者延迟,从而导致更高的端到端延迟。Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); ➊ producer = new KafkaProducer<>(producerProps); Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ➋ consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); ➌ consumer = new KafkaConsumer<>(consumerProps); producer.initTransactions(); ➍ consumer.subscribe(Collections.singleton(inputTopic)); ➎ while (true) { try { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); if (records.count() > 0) { producer.beginTransaction(); ➏ for (ConsumerRecord<Integer, String> record : records) { ProducerRecord<Integer, String> customizedRecord = transform(record); ➐ producer.send(customizedRecord); } Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(); producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());➑ producer.commitTransaction(); ➒ } } catch (ProducerFencedException|InvalidProducerEpochException e) { ➓ throw new KafkaException(String.format( "The transactional.id %s is used by another process", transactionalId)); } catch (KafkaException e) { producer.abortTransaction(); ⓫ resetToLastCommittedPositions(consumer); }}
Kafka事务的基本算法受到了Chandy-Lamport快照的启发,它会将一种被称为“标记”(marker)的消息发送到通信通道中,并根据标记的到达情况来确定一致性状态。
Kafka事务根据标记消息来判断跨多个分区的事务是否被提交或被中止——当生产者要提交一个事务时,它会发送“提交”消息给事务协调器,事务协调器会将提交标记写入所有涉及这个事务的分区。
如果生产者在向部分分区写入提交消息后发生崩溃,该怎么办?Kafka事务使用两阶段提交和事务日志来解决这个问题。总的来说,这个算法会执行如下步骤:
要实现这个算法,Kafka需要一个事务日志。这里使用了一个叫作 __transaction_state
的内部主题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。