当前位置:   article > 正文

【云原生进阶之PaaS中间件】第三章Kafka-4.2-生产者工作原理剖析

【云原生进阶之PaaS中间件】第三章Kafka-4.2-生产者工作原理剖析

1 kafka生产者工作模式

1.1 生产者消息发送流程

1.1.1 发送原理

        Producer首先调用send方法进行发送,首先会经过拦截器,可以对数据进行一些加工处理。随后会经过序列化,kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,但是Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。

        随后经过分区器(分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池,避免频繁的申请和释放内存),因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小32K。

        在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

        发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到五个请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。

        从图中的流程可以看出,生产者和kafka集群之间还有一个RecordAccumulator队列,默认大小是32M,topic分区的话,producer会对应有一个分区器,数据在进入中间队列前,已经被分区器进行了分区,sender()方法在发送数据时,就直接根据分区进行拉取了,拉取时有两个参数,也就是调优参数:

  1. batch.size :也就是批大小,只有数据累计到batch.size后,sender才会发送数据,默认16k ;
  2. linger.ms :也就是等待时间,如果数据未达到batch.size,sender等待linger.ms设置的时间就会发送数据,单位ms,默认值就是0ms,就是有了一条数据直接发(默认为0是因为kafka要接实时数仓,所以设置为0);

        kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1:

  • 0:生产者发送过来的数据,不需要等待数据落盘应答;
  • 1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓;
  • -1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。

        Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2),如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s,例如如果2超时,(leader:0,ISR:0,1)。

        在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景:

        kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值 :

        重复数据的评判标准:具有相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。

        所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务。

        使用幂等性:开启参数enable.idempotence默认为true,即默认为开启。

1.1.4 生产者重要参数列表

1.2 异步发送API

        生产者代码中有3必须,IP即连接地址、key和value的序列化器。

1.2.1 普通异步发送流程

创建maven项目

导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. </dependencies>

代码编写

  1. package com.atguigu.kafka.producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class CustomProducer {
  7. public static void main(String[] args) {
  8. // 1. 创建kafka生产者的配置对象
  9. Properties properties = new Properties();
  10. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  12. // key,value序列化(必须):
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  15. // 3. 创建kafka生产者对象
  16. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  17. // 4. 调用send方法,发送消息
  18. for (int i = 0; i < 10; i++) {
  19. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
  20. }
  21. // 5. 关闭资源
  22. kafkaProducer.close();
  23. }
  24. }

1.2.2 带回调函数的异步发送

        回调函数是实现应答机制的函数.

  1. package com.atguigu.kafka.producer;
  2. import org.apache.kafka.clients.producer.*;
  3. import java.util.Properties;
  4. public class CustomProducerCallBack {
  5. public static void main(String[] args) {
  6. // 1. 创建kafka生产者的配置对象
  7. Properties properties = new Properties();
  8. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  10. // key,value序列化(必须):
  11. // 序列化器的serialization是一个接口,找到他的实现类
  12. // 我们一般都是使用String
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  15. // 3. 创建kafka生产者对象
  16. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  17. // 4. 调用send方法,发送消息
  18. for (int i = 0; i < 10; i++) {
  19. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
  20. new Callback() {
  21. @Override
  22. public void onCompletion(RecordMetadata metadata, Exception exception) {
  23. //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法
  24. //(2)消息发送失败 exception != null 也会调用该方法
  25. if (exception == null) {
  26. System.out.println(metadata);//使用打印演示
  27. }else{
  28. exception.printStackTrace();//打印异常信息
  29. }
  30. }
  31. });
  32. }
  33. // 5. 关闭资源
  34. kafkaProducer.close();
  35. }
  36. }

1.3 同步发送API

  1. package com.atguigu.kafka.producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. import java.util.concurrent.ExecutionException;
  7. public class CustomProducerSync {
  8. public static void main(String[] args) throws ExecutionException, InterruptedException {
  9. // 1. 创建kafka生产者的配置对象
  10. Properties properties = new Properties();
  11. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  13. // key,value序列化(必须):
  14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  16. // 3. 创建kafka生产者对象
  17. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  18. // 4. 调用send方法,发送消息
  19. for (int i = 0; i < 10; i++) {
  20. // 默认为异步发送
  21. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
  22. // 末尾加get为同步发送
  23. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
  24. }
  25. // 5. 关闭资源
  26. kafkaProducer.close();
  27. }
  28. }

1.4 生产者分区

1.4.1 kafka分区的好处

        因为不同的分区分布在不同的节点上,所以便于合理使用资源,实现负载均衡。并且在不同节点上可以提高并行度。

1.4.2 生产者发送消息的分区策略

  1. 指定发送到哪一个分区,直接使用对应的分区号,不会走分区器;
  2. 不写分区号,需要走分区器,有key,按照key进行hash之后取模分区个数;
  3. 不写分区号,需要走分区器,没有key,粘性分区缓存机制;
    • 一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区;
    • 如果是异步发送,数据发送的比较快,10条数据被当作一批,每一次都是一个分区;
    • 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批;

  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. import java.util.concurrent.ExecutionException;
  4. public class CustomProducerCallBackPartition {
  5. public static void main(String[] args) throws ExecutionException, InterruptedException {
  6. // 1. 创建kafka生产者的配置对象
  7. Properties properties = new Properties();
  8. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  10. // key,value序列化(必须):
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  13. // 3. 创建kafka生产者对象
  14. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  15. // 4. 调用send方法,发送消息
  16. for (int i = 0; i < 10; i++) {
  17. // (1)指定发送到哪一个分区 直接使用对应的分区号 不会走分区器
  18. // (2) 不写分区号 需要走分区器 有key 按照key进行hash之后取模分区个数
  19. // (3) 不写分区号 需要走分区器 没有key 粘性分区缓存机制
  20. // 一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区
  21. // 如果是异步发送,数据发送的比较快 10条数据被当作一批 每一次都是一个分区
  22. // 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批
  23. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
  24. new Callback() {
  25. @Override
  26. public void onCompletion(RecordMetadata metadata, Exception exception) {
  27. //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法
  28. //(2)消息发送失败 exception != null 也会调用该方法
  29. if (exception == null) {
  30. System.out.println(metadata);
  31. }else{
  32. exception.printStackTrace();
  33. }
  34. }
  35. }).get();
  36. }
  37. //Thread.sleep(20);
  38. // 5. 关闭资源
  39. kafkaProducer.close();
  40. }
  41. }

1.4.3 自定义分区器

根据业务需求,可以自定义分区器。

假设需求:发送过来的数据中如果包含atguigu,就发往0号分区,不包含atguigu,就发往1号分区

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. // (1)实现分区器的接口
  5. public class CustomPartitioner implements Partitioner {
  6. /*
  7. 传参
  8. topic:主题 key:key值 keyBytes:key序列化之后 value:value值
  9. valueBytes:value序列化之后 cluster:集群信息 return的是分区号
  10. */
  11. @Override
  12. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  13. //
  14. String log = value.toString();
  15. if (log.contains("atguigu")) {
  16. return 0;
  17. }
  18. return 1;
  19. }
  20. @Override
  21. public void close() {
  22. }
  23. @Override
  24. public void configure(Map<String, ?> configs) {
  25. }
  26. }

然后,调用

  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class CustomProducerCallBack {
  4. public static void main(String[] args) {
  5. // 1. 创建kafka生产者的配置对象
  6. Properties properties = new Properties();
  7. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  8. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  9. // key,value序列化(必须):
  10. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  11. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  12. // 添加定义的分区器,需要自定义分区的全类名
  13. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.partitioner.CustomPartitioner");
  14. // 3. 创建kafka生产者对象
  15. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  16. // 4. 调用send方法,发送消息
  17. for (int i = 0; i < 10; i++) {
  18. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
  19. new Callback() {
  20. @Override
  21. public void onCompletion(RecordMetadata metadata, Exception exception) {
  22. //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法
  23. //(2)消息发送失败 exception != null 也会调用该方法
  24. if (exception == null) {
  25. System.out.println(metadata);
  26. }else{
  27. exception.printStackTrace();
  28. }
  29. }
  30. });
  31. }
  32. // 5. 关闭资源
  33. kafkaProducer.close();
  34. }
  35. }

1.5 生产经验

1.5.1 生产者如何提高吞吐量

提高吞吐量,就是提高批次传输大小,还有就是效率问题.

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. public class CustomProducerParameters {
  6. public static void main(String[] args) {
  7. // 1. 创建kafka生产者的配置对象
  8. Properties properties = new Properties();
  9. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  11. // key,value序列化(必须):
  12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14. //调优参数,还是需要根据业务需求来调整
  15. //batch.size 批次大小,默认是16k,将批次大小增大,进而提高吞吐量
  16. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);
  17. //linger.ms 等待时长,默认是0ms,增加等待时长
  18. properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
  19. //双端队列大小,默认是32M,可以提高到64M
  20. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);
  21. //调整压缩格式,默认没有压缩
  22. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  23. // 3. 创建kafka生产者对象
  24. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  25. // 4. 调用send方法,发送消息
  26. for (int i = 0; i < 10; i++) {
  27. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
  28. }
  29. // 5. 关闭资源
  30. kafkaProducer.close();
  31. }
  32. }

1.5.2 数据可靠性

        数据可靠性基于ack应答机制。数据完全可靠的条件:Acks级别设置为-1,分区副本大于等于2,ISR应答的最小副本数大于等于2。

副本介绍

(1)Kafka副本作用:提高数据可靠性。

(2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。

(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

AR = ISR + OSR

  • ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
  • OSR,表示Follower与Leader副本同步时,延迟过多的副本。

可靠性总结:

  1. acks=0,生产者数据发来,kafka集群内存接受到数据就返回ack
  2. acks=1,生产者数据发来,kafka集群中的leader落盘数据后返回ack
  3. acks=-1,生产者数据发来,kafka集群中的所有副本落盘数据后返回ack
  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. public class CustomProducerAcks {
  6. public static void main(String[] args) {
  7. // 1. 创建kafka生产者的配置对象
  8. Properties properties = new Properties();
  9. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
  10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  11. // key,value序列化(必须):
  12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14. //设置应答机制acks,可以去3个值,0、1、all(相当与ask = -1)
  15. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  16. //重试次数retries ,默认是int最大值,2147483647
  17. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  18. // 3. 创建kafka生产者对象
  19. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
  20. // 4. 调用send方法,发送消息
  21. for (int i = 0; i < 10; i++) {
  22. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
  23. }
  24. // 5. 关闭资源
  25. kafkaProducer.close();
  26. }
  27. }

副本故障处理

1.5.3 数据去重

1.5.3.1 数据传递语义

1.5.3.2 幂等性

        开启参数enable.idempotence 默认为true,false关闭。

1.5.3.3 生产者事务

        0.11版本的Kafka同时引入了事务的特性,为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

        为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

        就是引入一个全局唯一且一致的id,然后将id和pid绑定,从而使producer重启后,kafka集群依然可以通过id获得原来的pid。

        注意:开启事务,必须开启幂等性。

        一定要手动指定事务id:

1.5.4 数据有序

        分区内有序,分区之间无序:

1.5.5 数据乱序

        生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序。

        将max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。

        开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。

参考链接

【精选】Kafka基本原理详解_昙花逐月的博客-CSDN博客

这是最详细的Kafka应用教程了 - 掘金

Kafka : Kafka入门教程和JAVA客户端使用-CSDN博客

简易教程 | Kafka从搭建到使用 - 知乎

【精选】kafka简介_唏噗的博客-CSDN博客

Kafka 架构及基本原理简析

kafka是什么

再过半小时,你就能明白kafka的工作原理了

Kafka 设计与原理详解

Kafka【入门】就这一篇! - 知乎

kafka简介_kafka_唏噗-华为云开发者联盟

kafka详解

Kafka 设计与原理详解-CSDN博客

kafka学习知识点总结(三)

kafka——生产者原理解析_小波同学的技术博客_51CTO博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/80043
推荐阅读
相关标签
  

闽ICP备14008679号