当前位置:   article > 正文

kafka笔记(二):生产者-同步发送/异步发送/生产者分区/数据可靠性/数据去重/消息发送流程_linger.ms

linger.ms

目录

kafka生产者

生产者消息发送流程

发送原理

生产者重要参数列表

异步发送API

普通异步发送

带回调函数的异步发送

同步发送API

生产者分区

分区好处

生产者发送消息的分区策略

自定义分区器

生产经验—提高生产者的吞吐量

生产经验—数据可靠性

生产经验—数据去重

数据传递语义

幂等性

生产者事务

生产经验—数据有序

生产经验—数据乱序


kafka生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k。

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。

应答acks:

0:生产者发送过来的数据,不需要等数据落盘应答。 

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

生产者重要参数列表

参数名称描述
bootstrap.servers

生产者连接集群所需的broker地址清单。例如:hadoop01:9092,hadoop02:9092,hadoop03:9092,可以设置1个或者多个,中间用逗号隔开。注意,这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。(注:要写全列名)

buffer.memory

RecordAccumulator缓冲区总大小,默认32M。

batch.size

缓冲区一批数据的最大值,默认16k。适当增加该值可以提高吞吐量,但是如果该值设置太大会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time设置的时间到了就会发送数据。单位ms,默认值是0ms:表示没有延迟。

生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:none、gzip、snappy、lz4和zstd。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值:2147483647。

如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

异步发送API

普通异步发送

(1)需求:创建kafka生产者,采用异步发送的方式发送到kafka broker。

(2)代码编写:

1)创建maven工程

2)导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <plugins>
  10. <plugin>
  11. <artifactId>maven-compiler-plugin</artifactId>
  12. <version>3.6.0</version>
  13. <configuration>
  14. <source>1.8</source>
  15. <target>1.8</target>
  16. </configuration>
  17. </plugin>
  18. <plugin>
  19. <artifactId>maven-assembly-plugin</artifactId>
  20. <configuration>
  21. <descriptorRefs>
  22. <descriptorRef>jar-with-dependencies</descriptorRef>
  23. </descriptorRefs>
  24. </configuration>
  25. <executions>
  26. <execution>
  27. <id>make-assembly</id>
  28. <phase>package</phase>
  29. <goals>
  30. <goal>single</goal>
  31. </goals>
  32. </execution>
  33. </executions>
  34. </plugin>
  35. </plugins>
  36. </build>

3)创建包

4)编写API代码

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. public class CustomProducer {
  6. public static void main(String[] args) {
  7. /**
  8. 编写不带回调函数的API代码
  9. */
  10. //1.创建kafka生产者的配置对象
  11. Properties properties = new Properties();
  12. //2.给kafka配置对象添加配置信息:bootstrap.server
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  14. //key,value序列化:key.serializer,value.serializer
  15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  16. "org.apache.kafka.common.serialization.StringSerializer");
  17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  18. "org.apache.kafka.common.serialization.StringSerializer");
  19. //3.创建kafka生产者对象
  20. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  21. //4.调用send方法,发送信息
  22. for (int i = 0;i < 5;i++) {
  23. kafkaProducer.send(new ProducerRecord<>("first","hello kafka" + i));
  24. }
  25. //5.关闭资源
  26. kafkaProducer.close();
  27. }
  28. }

(3)测试

1)开启kafka

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看kafka是否接收到消息

带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exceptionnull说明消息发送成功,如果Exception不为null说明消息发送失败。

注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

(1)代码编写

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class CustomProducerCallBack {
  5. public static void main(String[] args) throws InterruptedException {
  6. /**
  7. * 编写回调函数异步发送API代码
  8. */
  9. //1.创建kafka生产者环境
  10. Properties properties = new Properties();
  11. //2.给kafka配置对象添加配置信息:bootstrap.server
  12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  13. //key.value序列化:key.serializer,value.serializer
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  16. //3.创建kafka生产者对象
  17. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  18. //4.调用send方法,发送信息
  19. for (int a = 0;a < 5; a++) {
  20. kafkaProducer.send(new ProducerRecord<>("first", "hello mykafka" + a), new Callback() {
  21. //在kafkaProducer接收到ack时调用,异步调用
  22. @Override
  23. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  24. if (e == null) {
  25. System.out.println("主题:"+ recordMetadata.topic() + "->"+"分区:"+recordMetadata.partition());
  26. } else {
  27. //出现异常打印
  28. e.printStackTrace();
  29. }
  30. }
  31. });
  32. //延迟一会会看到数据发往不同的分区
  33. Thread.sleep(2);
  34. }
  35. //5.关闭环境
  36. kafkaProducer.close();
  37. }
  38. }

(2)测试

1)开启消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看控制台中是否有接收到回调消息

3)查看kafka是否接收到消息

同步发送API

只需在异步发送的基础上再调用一下get()方法即可。

(1)代码编写

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. import java.util.concurrent.ExecutionException;
  7. public class CustomProducerS {
  8. public static void main(String[] args) throws ExecutionException,InterruptedException {
  9. /**
  10. * 同步发送API
  11. */
  12. //1.创建kafka生产者的配置环境
  13. Properties properties = new Properties();
  14. //2.给kafka配置对象添加配置信息
  15. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  16. //key,value序列化:key.serializer,value.serializer
  17. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  18. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  19. //3.创建kafka生产者对象
  20. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  21. //4.调用send方法,发送消息
  22. for (int i = 0; i < 8; i++) {
  23. //异步发送(默认)
  24. //kafkaProducer.send(new ProducerRecord<>("first","hello!"+i));
  25. //同步发送
  26. kafkaProducer.send(new ProducerRecord<>("first","kafka"+i)).get();
  27. }
  28. //5.关闭资源
  29. kafkaProducer.close();
  30. }
  31. }

(2)测试

1)开启消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看是否有接收到消息

生产者分区

分区好处

(1)便于合理使用存储资源:每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度:生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。

生产者发送消息的分区策略

(1)默认分区器:DefaultPartitioner

1)指明partition的情况下,直接将指明的值作为partition值;

例如partition=0,所有数据写入分区0。

2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;

例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那

么key1对应的value1写入1号分区,key2对应的value2写入0号分区。

3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

(2)案例一

将数据发送指定partition,如:将数据指定发送分区0中。

1)代码

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. import java.util.concurrent.ExecutionException;
  5. public class CustomProducerPartitions {
  6. public static void main(String[] args) throws ExecutionException,InterruptedException{
  7. /**
  8. * 将数据发送到指定分区
  9. * 将所有数据发送到分区0中
  10. */
  11. //1.创建kafka生产者的配置对象
  12. Properties properties = new Properties();
  13. //2.给kafka配置对象添加配置信息
  14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  15. //key,value序列化:key.serializer,value.serializer
  16. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  18. //3.创建kafka生产者对象
  19. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  20. //4.调用send方式,方式消息
  21. for (int i = 0; i < 5; i++) {
  22. //指定数据发送到1分区
  23. kafkaProducer.send(new ProducerRecord<>("first", 0, "", "hi spark" + i), new Callback() {
  24. @Override
  25. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  26. if (e == null) {
  27. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
  28. } else {
  29. e.printStackTrace();
  30. }
  31. }
  32. });
  33. }
  34. //5.关闭环境
  35. kafkaProducer.close();
  36. }
  37. }

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

执行idea中代码,观察控制台和kafka的消息

控制台

kafka

(3)案例二

没有指定partition值但是有key的情况,将key的hash值与topic的partition数进行取余得到partition值

1)代码

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class CustomProducerPartitions2 {
  5. public static void main(String[] args) {
  6. /**
  7. * 没有指定partition值但是有key的情况
  8. * 将key的hash值与topic的partition数进行取余得到partition值
  9. */
  10. //1.创建kafka生产者的配置对象
  11. Properties properties = new Properties();
  12. //2.给kafka配置对象添加配置信息
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  14. //key,value序列化:key.serializer,value.serializer
  15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  17. //3.创建kafka生产者对象
  18. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  19. //4.调用send方式,方式消息
  20. for (int i = 0; i < 5; i++) {
  21. //
  22. kafkaProducer.send(new ProducerRecord<>("first", "a","world" + i), new Callback() {
  23. @Override
  24. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  25. if (e == null) {
  26. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
  27. } else {
  28. e.printStackTrace();
  29. }
  30. }
  31. });
  32. }
  33. //5.关闭环境
  34. kafkaProducer.close();
  35. }
  36. }

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

key=a        查看控制台结果

key=b        查看控制台结果

key=c        查看控制台结果

 key=f        查看控制台结果

自定义分区器

根据需求可以自己重新实现分区器

(1)需求:实现一个分区器,发送的数据出现jeffry就发送到分区1,否则发送到分区2.

(2)实现:

1)代码

partition:

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. /**
  5. * 1.实现接口partitioner
  6. * 2.实现三个方法:partition,close,configure
  7. * 3.编写partition方法,返回分区号
  8. */
  9. public class MyPartition implements Partitioner {
  10. /**
  11. *
  12. * @param topic 主题
  13. * @param key 消息的key
  14. * @param keyBytes 消息的key序列化后的字节数组
  15. * @param value 消息的value
  16. * @param valueBytes 消息的value序列化后的字节数组
  17. * @param cluster 集群元数据可以查看分区信息
  18. * @return
  19. */
  20. @Override
  21. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  22. //获取消息
  23. String msgValue = value.toString();
  24. //创建partition
  25. int partition;
  26. //判断消息是否包含jeffry
  27. if (msgValue.contains("jeffry")) {
  28. partition = 1;
  29. } else {
  30. partition = 2;
  31. }
  32. //返回分区号
  33. return partition;
  34. }
  35. //关闭资源
  36. @Override
  37. public void close() {
  38. }
  39. //配置方法
  40. @Override
  41. public void configure(Map<String, ?> configs) {
  42. }
  43. }

测试自定义partition:

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class CustomProducerMyPartitions {
  5. public static void main(String[] args) {
  6. /**
  7. * 测试自定义partition
  8. */
  9. Properties properties = new Properties();
  10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  13. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zj.kafka.MyPartition");
  14. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  15. for (int i = 0;i < 5;i++) {
  16. kafkaProducer.send(new ProducerRecord<>("first", "jeffry" + i), new Callback() {
  17. @Override
  18. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  19. if (e == null) {
  20. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
  21. } else {
  22. e.printStackTrace();
  23. }
  24. }
  25. });
  26. }
  27. kafkaProducer.close();
  28. }
  29. }

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

在idea启动执行程序

查看控制台的信息

 查看kafka的信息

 

生产经验—提高生产者的吞吐量

batch.size:批次大小,默认16k;

linger.ms:等待时间,修改为5-100ms;

compression.type:压缩形式为snappy;

RecordAccumulator:缓冲区大小,修改为64m;

(1)代码

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class Parameters {
  7. public static void main(String[] args) {
  8. //创建kafka生产者配置对象
  9. Properties properties = new Properties();
  10. //给kafka配置对象添加配置信息
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  12. //key,value序列化
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  15. //batch.size:批次大小,默认16K
  16. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  17. //linger.ms:等待时间,默认0
  18. properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
  19. //RecordAccumulator:缓冲区大小,默认32M buffer。memory
  20. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  21. //compression.type:压缩,默认none,可以配置gzip、snappy、lzo、zstd
  22. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  23. //创建kafka生产者对象
  24. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  25. //调用send方法,发送信息
  26. for (int i = 0;i < 5;i++) {
  27. kafkaProducer.send(new ProducerRecord<>("first","jeffryOne" + i));
  28. }
  29. //关闭环境
  30. kafkaProducer.close();
  31. }
  32. }

(2)测试

1)开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)在idea中启动执行程序

观察kafka是否接收到消息

生产经验—数据可靠性

(1)ack应答原理

 

数据可靠性分析:

如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1( min.insync.replicas默认为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

数据完全可靠条件 = ACK级别为-1+分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;

acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

数据重复分析:

acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

(2)代码编写

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class CustomProducerAck {
  7. public static void main(String[] args) {
  8. //创建kafka生产者的配置对象
  9. Properties properties = new Properties();
  10. //给kafka配置对象添加配置信息
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  12. //keyvalue序列化
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  15. //设置acks
  16. properties.put(ProducerConfig.ACKS_CONFIG,"all");
  17. //重试次数retries。默认是int最大的值:2147483647
  18. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  19. //创建kafka生产者对象
  20. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  21. //调用send方法,发送信息
  22. for (int i = 0;i < 5;i++) {
  23. kafkaProducer.send(new ProducerRecord<>("first","jeffry" + i));
  24. }
  25. //关闭环境
  26. kafkaProducer.close();
  27. }
  28. }

(3)测试

1)开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)在idea中启动执行程序,查看kafka情况

生产经验—数据去重

数据传递语义

(1)至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2;

At Least Once可以保证数据不丢失,但是不能保证数据不重复。

(2)最多一次(At Most Once)= ACK级别设置为0;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

(3)精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

幂等性

(1)幂等性原理

幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次 = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数>=2)。

判断重复数据的标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。

PID是Kafka每次重启都会分配一个新的;

Partition表示分区号;

Sequence Number是单调自增的。

幂等性只能保证在单分区单会话内不重复。

(2)如何使用幂等性

开启enable.idempotence默认为true,false关闭。

生产者事务

(1)kafka事务原理

注:开启事务必须开启幂等性。 

 (2)kafka事务的5个API

  1. //1初始化事务
  2. void initTransactions();
  3. //2开启事务
  4. void beginTransaction() throws ProducerFencedException;
  5. //3在事务内提交已经消费的偏移量
  6. void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
  7. String consumerGroupId) throws ProducerFencedException;
  8. //4提交事务
  9. void commitTransaction() throws ProducerFencedException;
  10. //5放弃事务
  11. void abortTransaction() throws ProducerFencedException;

(3)单个producer,使用事务保证消息的仅一次发送

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class Transaction {
  7. public static void main(String[] args) throws InterruptedException {
  8. //1.创建kafka生产者的配置对象
  9. Properties properties = new Properties();
  10. //2.给kafka配置对象添加配置信息
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
  12. //keyvalue序列化
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  15. //设置事务id(必须),事务id可以任意起名
  16. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionis-0");
  17. //3.创建kafka生产者对象
  18. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  19. //初始化事务
  20. kafkaProducer.initTransactions();
  21. //开启事务
  22. kafkaProducer.beginTransaction();
  23. try {
  24. //4.调用send方法,发送信息
  25. for (int i = 0;i < 5;i++) {
  26. //发送消息
  27. kafkaProducer.send(new ProducerRecord<>("first","tom" + i));
  28. }
  29. int i = 1/0;
  30. //提交事务
  31. kafkaProducer.commitTransaction();
  32. } catch (Exception e) {
  33. //中止事务
  34. kafkaProducer.abortTransaction();
  35. } finally {
  36. //5.关闭环境
  37. kafkaProducer.close();
  38. }
  39. }
  40. }

生产经验—数据有序

单分区内数据是有序的;

多发区内数据是无序的;

生产经验—数据乱序

 (1)kafka在1.x版本之前保证数据单分区有序,条件如下:  

 max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

(2)kafka在1.x及以后版本保证数据单分区有序,条件如下:

1)开启幂等性

max.in.flight.requests.per.connection需要设置小于等于5。

2)未开启幂等性

max.in.flight.requests.per.connection需要设置为1。

原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。    

如果开启了幂等性且缓存的请求个数小于5个。会在服务端重新排序。

本文为学习笔记!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/1000622
推荐阅读
相关标签
  

闽ICP备14008679号