赞
踩
上节我们完成了如下内容:
Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回ACK信号值,实现流程如下:
生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:
上图这种情况,当Producer第一次发送消息给Broker时,Broker消息(x2,y2)追加到消息中,但是在返回ACK信号给Producer时失败了(比如网络异常)。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回ACK信号给Producer。这样下来,消息流中就被重复追加两条相同的(x2,y2)的消息。
保证咋消息重发的时候,消费者不会重复处理,即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x),f函数表示对消费的处理
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证结果一定一致的。
添加唯一Id,类似于数据库的主键,用于标记一个消息:
Kafka为了实现幂等性,它在底层设计架构中引入了Producer和SequenceNumber
同样的,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送ACK信号给Producer时出现了网络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回ACK信号给Producer时,发生异常导致Producer接收ACK信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存之前发送过的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
// 实例化⼀个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);
在 org.apache.kafka.clients.producer.iinternals.Sender 类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:
private void maybeWaitForPid() { if (transactionState == null) { return; } while (!transactionState.hasPid()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody(); transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch()); } else { log.error("Received an unexpected response type for an InitPidRequest from {}. We will back off and try again.", node); } } else { log.debug("Could not find an available broker to send InitPidRequest to. We will back off and try again."); } } catch (Exception e) { log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); } log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } }
在Kafka事务中,一个原子性操作,根据操作类型可以分为3中情况,情况如下:
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
package icu.wzk.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class MyTransactionalProducer { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 提供客户端ID configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer"); // 事务ID configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id"); // 要求ISR确认 configs.put(ProducerConfig.ACKS_CONFIG, "all"); KafkaProducer<String, String> producer = new KafkaProducer<>(configs); // 初始化事务 producer.initTransactions(); // 开启事务 producer.beginTransaction(); try { // 发送消息 producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02")); // 可以在这里设置一些异常来测试 比如: // int n = 1 / 0; } catch (Exception e) { // 中止事务 producer.abortTransaction(); } finally { producer.close(); } } }
运行之后,控制台输出结果如下:
package icu.wzk.kafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class MyTransactional { public static void main(String[] args) { KafkaProducer<String, String> producer = getProducer(); KafkaConsumer<String, String> consumer = getConsumer(); // 事务的初始化 producer.initTransactions(); // 订阅主题 consumer.subscribe(Collections.singleton("tp_tx_01")); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 开启事务 producer.beginTransaction(); try { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { System.out.println(record); producer.send(new ProducerRecord<>("tp_tx_out_01", record.key(), record.value())); offsets.put( new TopicPartition(record.topic(), record.partition()), // 偏移量表示下一条要消费的消息 new OffsetAndMetadata(record.offset() + 1) ); } // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量) producer.sendOffsetsToTransaction(offsets, "consumer_grp_02"); // 提交事务 producer.commitTransaction(); } catch (Exception e) { e.printStackTrace(); // 中止事务 producer.abortTransaction(); } finally { producer.close(); consumer.close(); } } public static KafkaProducer<String, String> getProducer() { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 设置ClientID configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01"); // 设置事务ID configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02"); // 要求ISR确认 configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 启用幂等性 configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); KafkaProducer<String, String> producer = new KafkaProducer<>(configs); return producer; } public static KafkaConsumer<String, String> getConsumer() { Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 设置消费组ID configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02"); // 不启⽤消费者偏移量的⾃动确认,也不要⼿动确认 configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); return consumer; } }
(由于我测试的云服务器的Kafka掉线了,我又启动了一次,重新执行一次案例1。)
下面是案例2直接的结果如下图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。