赞
踩
当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于副本机制 (replication) 的存在,它就不会丢失。但是如果 producer 发送数据给 broker 后,遇到的网络问题而造成通信中断,那 producer 就无法判断该条消息是否已经提交(commit)。
虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以 retry 多次,确保消息已经正确传输到 broker 中,所以目前 Kafka 实现的是 at least once。
所谓幂等性,就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幂等性功能就可以避免这种情况。
1)只能保证 Producer 在单个会话内不丢不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)。
2)幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。
幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。
为了实现事务,应用程序必须提供唯一的 transactionId,这个参数通过客户端程序来进行设定。
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
事务要求生产者开启幂等性特性,因此通过将 transaction.id 参数设置为非空从而开启事务特性的。
同时需要将 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG 设置为 true (默认值为 true),如果显示设置为 false,则会抛出异常。
--> idea --> File
--> New --> Project
--> Maven
Project SDK: ( 1.8(java version "1.8.0_131" )
--> Next
--> Groupld : ( djh.it )
Artifactld : ( kafka_learn )
Version : 1.0-SNAPSHOT
--> Name: ( kafka_learn )
Location: ( ...\kafka_learn\ )
--> Finish
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>djh.it</groupId> <artifactId>kafka_learn</artifactId> <version>1.0-SNAPSHOT</version> <name>kafka_learn</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath></relativePath> </parent> <properties> <java.version>8</java.version> <!-- <scala.version>2.11</scala.version>--> <scala.version>2.12</scala.version> <slf4j.version>1.7.21</slf4j.version> <!-- <kafka.version>2.0.0</kafka.version>--> <kafka.version>2.8.0</kafka.version> <lombok.version>1.18.8</lombok.version> <junit.version>4.11</junit.version> <gson.version>2.2.4</gson.version> <protobuff.version>1.5.4</protobuff.version> <!-- <spark.version>2.3.1</spark.version>--> <spark.version>2.4.8</spark.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.version}</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>${protobuff.version}</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>${protobuff.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.4</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.9.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> <!-- kafka_learn\pom.xml -->
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter7\ProducerTransactionSend.java * * 2024-6-24 创建 生产者 类 ProducerTransactionSend.java 演示事务。 */ package djh.it.kafka.learn.chapter7; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerTransactionSend { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; //提供唯一的事务 transactionId private static final String transactionId = "transactionId"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2)设置重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 10); //3)设置值 value 序列化器 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //4)设置集群地址 properties.put("bootstrap.servers", brokerList); //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); //5)设置事务唯一的 transactionId properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId); //6)生产者开启幂等性 properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); //初始化事务 producer.initTransactions(); //开启事务 producer.beginTransaction(); try{ //处理业务逻辑 ProducerRecord<String,String> record1 = new ProducerRecord<>(topic, "transaction-message-01"); producer.send(record1); // //模拟异常,出现异常会统一回滚事务,一个消息也收不到 // System.out.println(1/0); ProducerRecord<String,String> record2 = new ProducerRecord<>(topic, "transaction-message-02"); producer.send(record2); ProducerRecord<String,String> record3 = new ProducerRecord<>(topic, "transaction-message-03"); producer.send(record3); //提交事务 producer.commitTransaction(); }catch (Exception e){ //回滚事务 producer.abortTransaction(); } producer.close(); } }
/** * kafka_learn\src\main\java\djh\it\kafka\learn\chapter7\ConsumerFastStart7.java * * 2024-6-24 创建 消费者类 ConsumerFastStart7.java 进行事务演示。 */ package djh.it.kafka.learn.chapter7; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; //注意导包,一定要导成 kafka 的序列化包 import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerFastStart7 { //private static final String brokerList = "localhost:9092"; private static final String brokerList = "172.18.30.110:9092"; private static final String topic = "heima"; private static final String groupId = "group.demo"; public static void main( String[] args ) { Properties properties = new Properties(); //1)设置 key 序列化器 -- 优化代码 //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //2)设置值序列化器 -- 优化代码 //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //3)设置集群地址 -- 优化代码 //properties.put("bootstrap.servers", brokerList); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); //properties.put("group.id", groupId); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(topic)); while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String,String> record : records){ System.out.println(record.value()); } } } }
事务案例演示说明:
1)模拟异常, transaction-message-01 发送成功之后,出现异常,事务回滚。
System.out.println(1/0);
2)虽然 transaction-message-01 发送成功,但是 消费端一个消息也收不到。
演示事务.png
在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器 (KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
使用 zookeeper 图形化的客户端工具(ZooInspector)提供的 jar 来进行管理,启动如下:
- 1、定位到jar 所在目录
- 2、运行 jar 文件 java -jar zookeeper-dev-Zoolnspector.jar
- 3、连接 Zookeeper
kafka 可靠性保证: 确保系统在各种不同的环境下能够发生一致的行为。
1)保证分区消息的顺序
2)只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交
3)只要还有一个副本是活跃的,那么已提交的消息就不会丢失。
4)消费者只能读取已经提交的消息。
怎么样判定一个分区是否有副本是处于同步失效状态的呢?
1)从Kafka 0.9.x 版本开始通过唯一的一个参数 replica.lag.time.max.ms(默认大小为10,000)来控制,当 ISR 中的一个 follower 副本滞后 leader 副本的时间超过参数 replica.lag.time.max.ms 指定的值时即判定为副本失效,需要将此 follower 副本剔出除 ISR 之外。具体实现原理很简单,当 follower 副本将 leader 副本的 LEO(LogEnd Offset,每个分区最后一条消息的位置)之前的日志全部同步时,则认为该 follower 副本已经追赶上 leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。
2)Kafka 的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。千万不要错误的认为 follower 副本只要拉取 leader 副本的数据就会更新 lastCaughtUpTimeMs,试想当 leader 副本的消息流入速度大于 follower 副本的拉取速度时,follower 副本一直不断的拉取leader副本的消,息也不能与 leader 副本同步,如果还将此 follower 副本置于 ISR 中,那么当 leader 副本失效,而选取此 follower 副本为新的 leader 副本,那么就会有严重的消息丢失。
1)Kafka 中的每个主题分区都被复制了n次,其中的n是主题的复制因子(replication factor)。这允许 Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。
2)Kafka 的复制是以分区为粒度的。分区的预写日志被复制到n个服务器。
3)在n个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer 只能往 leader 分区上写数据(读也只能从leader分区上进行),followers 只按顺序从 leader 上复制日志。
4)一个副本可以不同步 Leader 有如下几个原因:
1、慢副本: 在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是 I/0 瓶颈导致 follower 追加复制消息慢于从 leader 拉取速度。
2、卡住副本: 在一定周期时间内 follower 停止从 leader 拉取请求。follower replica 卡住了是由于 GC 暂停或 follower失效或死亡。
3、新启动副本: 当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。
1)在服务端现在只有一个参数需要配置 replica.lag.time.max.ms。这个参数解释 replicas 响应 partition leader 的最长等待时间。
2)检测卡住或失败副本的探测–如果一个 replica 失败导致发送拉取请求时间间隔超过 replica.lag.time.max.ms。Kafka 会认为此 replica 已经死亡会从同步副本列表从移除。
3)检测慢副本机制发生了变化–如果一个 replica 开始落后 leader 超过 replica.lag.time.max.ms。Kafka 会认为太缓慢并且会从同步副本列表中移除。除非 replica 请求 leader 时间间隔大于 replica.lag.time.max.ms,
4)因此即使 leader 使流量激增和大批量写消息。Kafka 也不会从同步副本列表从移除该副本。
在 leader 宕机后,只能从 ISR 列表中选取新的 leader,无论 ISR 中哪个副本被选为新的 leader,它都知道 HW 之前的数据,可以保证在切换了leader 后,消费者可以继续看到 HW 之前已经提交的数据。
HW 的截断机制: 选出了新的 leader,而新的 leader 并不能保证已经完全同步了之前 leader的 所有数据,只能保证 HW 之前的数据是同步过的,此时所有的 follower 都要将数据截断到 HW 的位置,再和新的 leader 同步数据,来保证数据一致。
当宕机的 leader 恢复,发现新的 leader 中的数据和自己持有的数据不一致,此时宕机的 leader 会将自己的数据截断到宕机之前的 hw 位置,然后同步新 leader 的数据。宕机的 leader 活过来也像 follower 一样同步数据,来保证数据的一致性。
1)在于 HW 值被用于衡量副本备份的成功与否以及在出现 failture 时作为日志截断的依据,
2)HW 值的更新是异步延迟的,特别是需要额外的 FETCH 请求处理流程才能更新,故这中间发生的任何崩溃都可能导致 HW 值的过期。
Leader 端多开辟一段内存区域专门保存 leader 的 epoch 信息,这样即使出现 数据丢失场景和数据出现不一致场景两个场景也能很好地规避这些问题。
1)所谓 leader epoch 实际上是一对值:(epoch,offset)。epoch 表示 leader 的版本号,从0开始,当 leader 变更过1次时 epoch 就会+1,而 offset 则对应于该 epoch 版本的 leader 写入第一条消息的位移。
2)因此假设有两对值:
(0, 0)
(1,120)
则表示第一个 leader 从位移0开始写入消息;共写了120条[0,119];而第二个leader版本号是1,从位移120处开始写入消息。
leader broker 中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。
1)kafka 生产者端重复 问题描述:
生产发送的消息没有收到正确的 broke 响应,导致 producer 重试。producer 发出一条消息,broke 落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后 producer 收到一个可恢复的 Exception 重试消息导致消息重复。
2)kafka 生产者端重复 解决方案:
要启动 kafka 的幂等性,无需修改代码,默认为关闭,需要修改配置文件: enable.idempotence=true 同时要求 ack=all 目 retries>1。
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
1)kafka 消费者端重复 根本原因:
数据消费完没有及时提交 offset 到 broker。
2)kafka 消费者端重复 解决方案:
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。
一般的解决方案是让下游做幂等或者尽量每消费一条消息都记录 offset,对于少数严格的场景可能需要把 offset 或唯一 ID,例如订单 ID 和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。
consumer_offsets 是一个内部 topic,对用户而言是透明的,除了它的数据文件以及偶尔在日志中出现这两点之外,用户一般是感觉不到这个 topic 的。不过我们的确知道它保存的是 Kafka 新版本 consumer 的位移信息。
一般情况下,当集群中第一有消费者消费消息时会自动创建主题_consumer_offsets,分区数可以通过 offsets.topic.num.partitions 参数设定,默认值为 50。
1)Kafka 幂等性
2)kafka 事务的处理
3)kafka 可靠性保证
4)kafka 一致性保证
5)kafka 消息重复以及决方案。
上一节关联链接请点击
# Kafka_深入探秘者(6):kafka 物理存储
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。