赞
踩
数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。
通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。
1.「生产端:」 遇到异常,基本解决措施都是 「重试」。
场景一:leader
分区不可用了,抛 LeaderNotAvailableException
异常,等待选出新 leader
分区。
场景二:Controller
所在 Broker
挂了,抛 NotControllerException
异常,等待 Controller
重新选举。
场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException
异常,等待网络恢复。
2.「消费端:」 poll
一批数据,处理完毕还没提交 offset
,机子宕机重启了,又会 poll
上批数据,再度消费就造成了消息重复。
「先来了解下消息的三种投递语义:」
最多一次(at most once
): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt
中 QoS = 0
。
至少一次(at least once
): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt
中 QoS = 1
精确一次(exactly once
): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt
中 QoS = 2
。
Kafka
幂等性 Producer
: 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
Kafka
事务: 保证生产端发送消息幂等。解决幂等 Producer
的局限性。
消费端幂等: 保证消费端接收消息幂等。蔸底方案。
Kafka
幂等性 Producer
❝「幂等性指」:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。
❞
「幂等性使用示例:在生产端添加对应配置即可」
- Properties props = new Properties();
- props.put("enable.idempotence", ture); // 1. 设置幂等
- props.put("acks", "all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
- props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
1.设置幂等,启动幂等。
2.配置 acks
,注意:一定要设置 acks=all
,否则会抛异常。
3.配置max.in.flight.requests.per.connection
需要 <= 5
,否则会抛异常 OutOfOrderSequenceException
。
0.11 >= Kafka < 1.1
, max.in.flight.request.per.connection = 1
Kafka >= 1.1
, max.in.flight.request.per.connection <= 5
1.Producer
每次启动后,会向 Broker
申请一个全局唯一的 pid
。(重启后 pid
会变化,这也是弊端之一)
2.Sequence Numbe
:针对每个 <Topic, Partition>
都对应一个从0开始单调递增的 Sequence
,同时 Broker
端会缓存这个 seq num
3.判断是否重复: 拿 <pid, seq num>
去 Broker
里对应的队列 ProducerStateEntry.Queue
(默认队列长度为 5)查询是否存在
如果 nextSeq == lastSeq + 1
,即 服务端seq + 1 == 生产传入seq
,则接收。
如果 nextSeq == 0 && lastSeq == Int.MaxValue
,即刚初始化,也接收。
反之,要么重复,要么丢消息,均拒绝。
「消息重复:」 场景 Broker
保存消息后还没发送 ack
就宕机了,这时候 Producer
就会重试,这就造成消息重复。
「消息乱序:」 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。
如果已经使用 acks=all
,使用幂等也可以。
如果已经使用 acks=0
或者 acks=1
,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。
Kafka
事务❝使用
Kafka
事务解决幂等的弊端:单会话且单分区幂等。「
❞Tips
:」 这块篇幅较长,这先稍微提及下使用,之后另起一篇。
「事务使用示例:分为生产端 和 消费端」
- Properties props = new Properties();
- props.put("enable.idempotence", ture); // 1. 设置幂等
- props.put("acks", "all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
- props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待数
- props.put("transactional.id", "my-transactional-id"); // 4. 设定事务 id
-
- Producer<String, String> producer = new KafkaProducer<String, String>(props);
-
- // 初始化事务
- producer.initTransactions();
-
- try{
- // 开始事务
- producer.beginTransaction();
-
- // 发送数据
- producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
-
- // 数据发送及 Offset 发送均成功的情况下,提交事务
- producer.commitTransaction();
- } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
- // 数据发送或者 Offset 发送出现异常时,终止事务
- producer.abortTransaction();
- } finally {
- // 关闭 Producer 和 Consumer
- producer.close();
- consumer.close();
- }
这里消费端Consumer
需要设置下配置:isolation.level
参数
「read_uncommitted
:」 这是默认值,表明 Consumer
能够读取到 Kafka
写入的任何消息,不论事务型 Producer
提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer
,那么对应的 Consumer
就不要使用这个值。
「read_committed
:」 表明 Consumer
只会读取事务型 Producer
成功提交事务写入的消息。当然了,它也能看到非事务型 Producer
写入的所有消息。
❝“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。
❞
「典型的方案是使用:消息表,来去重:」
上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id
新增到本地消息表中,同时更新订单信息。
如果消息重复,则新增操作 insert
会异常,同时触发事务回滚。
❝环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
❞
「准备工作如下:」
1、Zookeeper:本地使用 Docker
启动
- $ docker run -d --name zookeeper -p 2181:2181 zookeeper
- a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
2、Kafka:版本 2.7.1
,源码编译启动(看上文源码搭建启动)
3、启动生产者:Kafka
源码中 exmaple
中
4、启动消息者:可以用 Kafka
提供的脚本
- # 举个栗子:topic 需要自己去修改
- $ cd ./kafka-2.7.1-src/bin
- $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
创建topic
:1副本,2 分区
- $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
-
- # 查看
- $ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
「生产者代码:」
- public class KafkaProducerApplication {
-
- private final Producer<String, String> producer;
- final String outTopic;
-
- public KafkaProducerApplication(final Producer<String, String> producer,
- final String topic) {
- this.producer = producer;
- outTopic = topic;
- }
-
- public void produce(final String message) {
- final String[] parts = message.split("-");
- final String key, value;
- if (parts.length > 1) {
- key = parts[0];
- value = parts[1];
- } else {
- key = null;
- value = parts[0];
- }
- final ProducerRecord<String, String> producerRecord
- = new ProducerRecord<>(outTopic, key, value);
- producer.send(producerRecord,
- (recordMetadata, e) -> {
- if(e != null) {
- e.printStackTrace();
- } else {
- System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
- }
- }
- );
- }
-
- public void shutdown() {
- producer.close();
- }
-
- public static void main(String[] args) {
-
- final Properties props = new Properties();
-
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
- props.put(ProducerConfig.ACKS_CONFIG, "all");
-
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- final String topic = "myTopic";
- final Producer<String, String> producer = new KafkaProducer<>(props);
- final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
-
- String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
- try {
- List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
- linesToProduce.stream().filter(l -> !l.trim().isEmpty())
- .forEach(producerApp::produce);
- System.out.println("Offsets and timestamps committed in batch from " + filePath);
- } catch (IOException e) {
- System.err.printf("Error reading file %s due to %s %n", filePath, e);
- } finally {
- producerApp.shutdown();
- }
- }
- }
「启动生产者后,控制台输出如下:」
「启动消费者:」
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
启用幂等的情况下,调整acks
配置,生产者启动后结果是怎样的:
修改配置 acks = 1
修改配置 acks = 0
会直接报错:
- Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
- Otherwise we cannot guarantee idempotence.
「启用幂等的情况下,调整此配置,结果是怎样的:」
将 max.in.flight.requests.per.connection > 5
会怎样?
「当然会报错:」
Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。