当前位置:   article > 正文

# Kafka_深入探秘者(7):kafka 稳定性

# Kafka_深入探秘者(7):kafka 稳定性

Kafka_深入探秘者(7):kafka 稳定性

一、kafka 幂等性

1、Kafka 的消息传输保障机制非常直观。

当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于副本机制 (replication) 的存在,它就不会丢失。但是如果 producer 发送数据给 broker 后,遇到的网络问题而造成通信中断,那 producer 就无法判断该条消息是否已经提交(commit)。

虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以 retry 多次,确保消息已经正确传输到 broker 中,所以目前 Kafka 实现的是 at least once。

2、kafka 幂等性

所谓幂等性,就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用 Kafka 的幂等性功能就可以避免这种情况。

3、kafka 幂等性是有条件的:

  • 1)只能保证 Producer 在单个会话内不丢不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)。

  • 2)幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

4、Producer 使用幂等性的示例非常简单,与正常情况下 Producer 使用相比变化不大,只需要把 Producer 的配置 enable.idempotence 设置为 true 即可。

二、kafka 事务 001

1、kafka 事务 场景

幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。

为了实现事务,应用程序必须提供唯一的 transactionId,这个参数通过客户端程序来进行设定。

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
  • 1

2、kafka 事务 前期准备

事务要求生产者开启幂等性特性,因此通过将 transaction.id 参数设置为非空从而开启事务特性的。

同时需要将 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG 设置为 true (默认值为 true),如果显示设置为 false,则会抛出异常。

3、打开 idea 创建 artifactId 名为 kafka_learn 的 maven 工程。


	--> idea --> File 
	--> New --> Project 
	--> Maven 
		Project SDK: ( 1.8(java version "1.8.0_131" ) 
	--> Next 
	--> Groupld : ( djh.it )
		Artifactld : ( kafka_learn )
		Version : 1.0-SNAPSHOT
	--> Name: ( kafka_learn )
		Location: ( ...\kafka_learn\ )	
	--> Finish
	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4、在 kafka_learn 工程的 pom.xml 文件中导入依赖坐标。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>djh.it</groupId>
    <artifactId>kafka_learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka_learn</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath></relativePath>
    </parent>

    <properties>
        <java.version>8</java.version>
<!--        <scala.version>2.11</scala.version>-->
        <scala.version>2.12</scala.version>
        <slf4j.version>1.7.21</slf4j.version>
<!--        <kafka.version>2.0.0</kafka.version>-->
        <kafka.version>2.8.0</kafka.version>
        <lombok.version>1.18.8</lombok.version>
        <junit.version>4.11</junit.version>
        <gson.version>2.2.4</gson.version>
        <protobuff.version>1.5.4</protobuff.version>
<!--        <spark.version>2.3.1</spark.version>-->
        <spark.version>2.4.8</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.version}</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>${gson.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.4</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
<!-- kafka_learn\pom.xml -->
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170

5、在 kafka_learn 工程中,创建 生产者 类 ProducerTransactionSend.java 演示事务。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter7\ProducerTransactionSend.java
 *
 *  2024-6-24 创建 生产者 类 ProducerTransactionSend.java 演示事务。
 */
package djh.it.kafka.learn.chapter7;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTransactionSend {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";
    private static final String topic = "heima";
    //提供唯一的事务 transactionId
    private static final String transactionId = "transactionId";

    public static void main( String[] args ) {
        Properties properties = new Properties();
        //1)设置 key 序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值 value 序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址
        properties.put("bootstrap.servers", brokerList);
        //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        //5)设置事务唯一的 transactionId
       properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

        //6)生产者开启幂等性
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //初始化事务
        producer.initTransactions();
        //开启事务
        producer.beginTransaction();

        try{
            //处理业务逻辑
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic, "transaction-message-01");
            producer.send(record1);

//            //模拟异常,出现异常会统一回滚事务,一个消息也收不到
//            System.out.println(1/0);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic, "transaction-message-02");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic, "transaction-message-03");
            producer.send(record3);

            //提交事务
            producer.commitTransaction();

        }catch (Exception e){
            //回滚事务
            producer.abortTransaction();
        }
        producer.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

6、在 kafka_learn 工程中,创建 消费者类 ConsumerFastStart7.java 进行事务演示。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter7\ConsumerFastStart7.java
 *
 *  2024-6-24 创建 消费者类 ConsumerFastStart7.java 进行事务演示。
 */
package djh.it.kafka.learn.chapter7;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart7 {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    private static final String groupId = "group.demo";

    public static void main( String[] args ) {
        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //2)设置值序列化器 -- 优化代码
        //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //3)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        //properties.put("group.id", groupId);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

在这里插入图片描述

三、kafka 事务 002

事务案例演示说明:

1)模拟异常, transaction-message-01 发送成功之后,出现异常,事务回滚。
System.out.println(1/0);

2)虽然 transaction-message-01 发送成功,但是 消费端一个消息也收不到。

演示事务.png

四、kafka 控制器

1、kafka 控制器

在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器 (KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

2、Kafka 中的控制器选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建 /controller 这个临时(EPHEMERAL)节点。

3、Zoolnspector 管理

使用 zookeeper 图形化的客户端工具(ZooInspector)提供的 jar 来进行管理,启动如下:


- 1、定位到jar 所在目录
- 2、运行 jar 文件 java -jar zookeeper-dev-Zoolnspector.jar
- 3、连接 Zookeeper
  • 1
  • 2
  • 3
  • 4

ZooInspector-1.png

ZooInspector-2.png

4、其中 version 在目前版本中固定为 1,brokerid 表示称为控制器的 broker 的 id 编号,timestamp 表示竞选称为控制器时的时间戳。

5、在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试去读取 /controller 节点的 brokerid 的值,如果读取到 brokerid 的值不为 -1,则表示已经有其它 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 Zookeeper 中不存在 /controller 这个节点,或者这个节点中的数据异常,那么就会尝试去创建 /controller 这个节点,当前 broker 去创健节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activecontrollerld。

6、Zookeeper 中还有一个与控制器有关的 /controller_epoch 节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。

7、controller_epoch 的初始值为 1,即集群中第一个控制器的纪元为 1,当控制器发生变更时,没选出一个新的控制器就将该字段值加 1。每个和控制器交互的请求都会携带上 controller_epoch 这个字段,如果请求的 controller_epoch 值小于内存中的 controller_epoch 值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的 controller_epoch 值大于内存中的 controller_epoch 值,那么则说明已经有新的控制器当选了。由此可见,Kafka 通过 controller_epoch 来保证控制器的唯一性,进而保证相关操作的一致性。

8、具备控制器身份的 broker 需要比其他普通的 broker 多一份职责,具体细节如下:

  • 1、监听 partition 相关的变化。
  • 2、监听 topic 相关的变化。
  • 3、监听 broker 相关的变化
  • 4、从 Zookeeper 中读取获取当前所有与 topic、partition 以及 broker 有关的信息并进行相应的管理。

五、kafka 可靠性保证

1、kafka 可靠性保证

kafka 可靠性保证: 确保系统在各种不同的环境下能够发生一致的行为。

2、Kafka 的保证

1)保证分区消息的顺序

  • 如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入。
  • 那么 Kafka 可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B。

2)只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交

  • 生产者可以选择接收不同类型的确认,控制参数 acks

3)只要还有一个副本是活跃的,那么已提交的消息就不会丢失。

4)消费者只能读取已经提交的消息。

3、kafka 失效副本

怎么样判定一个分区是否有副本是处于同步失效状态的呢?

1)从Kafka 0.9.x 版本开始通过唯一的一个参数 replica.lag.time.max.ms(默认大小为10,000)来控制,当 ISR 中的一个 follower 副本滞后 leader 副本的时间超过参数 replica.lag.time.max.ms 指定的值时即判定为副本失效,需要将此 follower 副本剔出除 ISR 之外。具体实现原理很简单,当 follower 副本将 leader 副本的 LEO(LogEnd Offset,每个分区最后一条消息的位置)之前的日志全部同步时,则认为该 follower 副本已经追赶上 leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识。

2)Kafka 的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数 replica.lag.time.max.ms 指定的值。千万不要错误的认为 follower 副本只要拉取 leader 副本的数据就会更新 lastCaughtUpTimeMs,试想当 leader 副本的消息流入速度大于 follower 副本的拉取速度时,follower 副本一直不断的拉取leader副本的消,息也不能与 leader 副本同步,如果还将此 follower 副本置于 ISR 中,那么当 leader 副本失效,而选取此 follower 副本为新的 leader 副本,那么就会有严重的消息丢失。

4、kafka 副本复制

1)Kafka 中的每个主题分区都被复制了n次,其中的n是主题的复制因子(replication factor)。这允许 Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。

2)Kafka 的复制是以分区为粒度的。分区的预写日志被复制到n个服务器。

3)在n个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer 只能往 leader 分区上写数据(读也只能从leader分区上进行),followers 只按顺序从 leader 上复制日志。

4)一个副本可以不同步 Leader 有如下几个原因:

  • 1、慢副本: 在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是 I/0 瓶颈导致 follower 追加复制消息慢于从 leader 拉取速度。

  • 2、卡住副本: 在一定周期时间内 follower 停止从 leader 拉取请求。follower replica 卡住了是由于 GC 暂停或 follower失效或死亡。

  • 3、新启动副本: 当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。

5、如何确定副本是滞后的?

副本是滞后replica.lag.max.messages=4.png

1)在服务端现在只有一个参数需要配置 replica.lag.time.max.ms。这个参数解释 replicas 响应 partition leader 的最长等待时间。

2)检测卡住或失败副本的探测–如果一个 replica 失败导致发送拉取请求时间间隔超过 replica.lag.time.max.ms。Kafka 会认为此 replica 已经死亡会从同步副本列表从移除。

3)检测慢副本机制发生了变化–如果一个 replica 开始落后 leader 超过 replica.lag.time.max.ms。Kafka 会认为太缓慢并且会从同步副本列表中移除。除非 replica 请求 leader 时间间隔大于 replica.lag.time.max.ms,

4)因此即使 leader 使流量激增和大批量写消息。Kafka 也不会从同步副本列表从移除该副本。

六、kafka 一致性保证

1、kafka 一致性保证

  • 在 leader 宕机后,只能从 ISR 列表中选取新的 leader,无论 ISR 中哪个副本被选为新的 leader,它都知道 HW 之前的数据,可以保证在切换了leader 后,消费者可以继续看到 HW 之前已经提交的数据。

  • HW 的截断机制: 选出了新的 leader,而新的 leader 并不能保证已经完全同步了之前 leader的 所有数据,只能保证 HW 之前的数据是同步过的,此时所有的 follower 都要将数据截断到 HW 的位置,再和新的 leader 同步数据,来保证数据一致。

  • 当宕机的 leader 恢复,发现新的 leader 中的数据和自己持有的数据不一致,此时宕机的 leader 会将自己的数据截断到宕机之前的 hw 位置,然后同步新 leader 的数据。宕机的 leader 活过来也像 follower 一样同步数据,来保证数据的一致性。

2、Leader Epoch 引用 :数据丢失场景

数据丢失场景.png

3、Leader Epoch 引用 :数据出现不一致场景

数据出现不一致场景.png

4、造成数据丢失场景和数据出现不一致场景两个问题的根本原因

1)在于 HW 值被用于衡量副本备份的成功与否以及在出现 failture 时作为日志截断的依据,

2)HW 值的更新是异步延迟的,特别是需要额外的 FETCH 请求处理流程才能更新,故这中间发生的任何崩溃都可能导致 HW 值的过期。

5、Kafka0.11 引入了leader epoch 来取代 HW 值。

Leader 端多开辟一段内存区域专门保存 leader 的 epoch 信息,这样即使出现 数据丢失场景和数据出现不一致场景两个场景也能很好地规避这些问题。

6、leader epoch

1)所谓 leader epoch 实际上是一对值:(epoch,offset)。epoch 表示 leader 的版本号,从0开始,当 leader 变更过1次时 epoch 就会+1,而 offset 则对应于该 epoch 版本的 leader 写入第一条消息的位移。

2)因此假设有两对值:

(0, 0)
(1,120)

则表示第一个 leader 从位移0开始写入消息;共写了120条[0,119];而第二个leader版本号是1,从位移120处开始写入消息。

leader broker 中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。

7、kafka 避免数据丢失的场景

kafka避免数据丢失的场景.png

8、kafka 避免数据出现不一致场景

kafka避免数据出现不一致场景.png

七、kafka 稳定性 总结

1、kafka 消息重复的场景及解决方案

1.1 kafka 生产者端重复

1)kafka 生产者端重复 问题描述:

生产发送的消息没有收到正确的 broke 响应,导致 producer 重试。producer 发出一条消息,broke 落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后 producer 收到一个可恢复的 Exception 重试消息导致消息重复。

2)kafka 生产者端重复 解决方案:

  • 1、启动 kafka 的幂等性

要启动 kafka 的幂等性,无需修改代码,默认为关闭,需要修改配置文件: enable.idempotence=true 同时要求 ack=all 目 retries>1。

  • 2、ack=0,不重试。

可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

1.2 kafka 消费者端重复

1)kafka 消费者端重复 根本原因:

数据消费完没有及时提交 offset 到 broker。

2)kafka 消费者端重复 解决方案:

  • 1、取消自动自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

  • 2、下游做幂等

一般的解决方案是让下游做幂等或者尽量每消费一条消息都记录 offset,对于少数严格的场景可能需要把 offset 或唯一 ID,例如订单 ID 和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。

2、consumer_offsets

consumer_offsets 是一个内部 topic,对用户而言是透明的,除了它的数据文件以及偶尔在日志中出现这两点之外,用户一般是感觉不到这个 topic 的。不过我们的确知道它保存的是 Kafka 新版本 consumer 的位移信息。

3、kafka 何时创建 consumer_offsets

一般情况下,当集群中第一有消费者消费消息时会自动创建主题_consumer_offsets,分区数可以通过 offsets.topic.num.partitions 参数设定,默认值为 50。

4、kafka 稳定性 总结

1)Kafka 幂等性
2)kafka 事务的处理
3)kafka 可靠性保证
4)kafka 一致性保证
5)kafka 消息重复以及决方案。

上一节关联链接请点击
# Kafka_深入探秘者(6):kafka 物理存储

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/785251
推荐阅读
相关标签
  

闽ICP备14008679号