当前位置:   article > 正文

第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据_kafka主题插入数据

kafka主题插入数据

3.1 消息传递模型

3.1.1 点对点模型

重要的特性:

    1. 消息通过 队列来进行交换
    1. 每条消息仅会传递给一个消费者
    1. 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级)
    1. 生产者或者消费者可以动态加入

传送模型:

    1. 异步即发即弃:生产者发送一条消息,不会等待收到一个响应
    1. 异步请求、应答:生产者发送一条消息,阻塞等待应答队列,应答队列等待消费者响应

分类

    1. 单工通信:数据智能单向传输,有固定发送者和接收者。如:遥控器
    1. 半双工通信:数据可以双向传输,但不能同时。如:对讲机
    1. 全双工通信:数据可以同时双向传输。如:电话

3.1.2 发布、订阅模型

示例:
当我们在浏览视频或者博客论坛之类的网站时,遇到感兴趣的up主或者博主, 我们通常会选择去订阅他们的频道或者内容。 这样一来,每当他们发布一个新的内容, 网站平台方就会通过某种渠道来通知我们, 我们便可以在第一时间了解到这一讯息, 至于是否选择第一时间阅读, 则却决于我们自己,而这就是一个典型的订阅发布模式。

当使用点对点传输模型,当我们向了解特定信息,需要定期去信息平台去查看是否更新了新的内容,这样就会产生一些不必要的时间开支

重要的特性:

  1. 消息通过一个称为主题的虚拟通道进行交换。
  2. 每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久性、非持久性和动态性。
  3. 发布者通常不会知道、也也意识不到哪一个订阅者正在接收主题消息。
  4. 消息被推送给消费者,这意味着消息会传送给消费者,而无需请求。消息通过一个称为主题的虚拟通道进行交换。主题就是生产者发布消息和订阅者消费消息的目的地。传送给一个主题的消息被自动推送给所有合格的消费者。
  5. 生产者和消费者之间没有耦合。订阅者和发布者可以在运行时动态添加,这使得系统的复杂性可以随时间的推移而增长。
  6. 订阅一个主题的每个客户端都会接收到发布该主题的消息副本。发布者生产的单条消息可以复制并分发给成百上千的订阅者。

3.1.3 主题模型

主题模型是一种更加灵活和强大的消息模型,它可以根据消息的主题进行过滤和匹配。在主题模型中,消息发送者将消息发布到一个交换机,而消费者通过创建绑定到该交换机的队列,并指定自己感兴趣的主题(由路由键来表示)。交换机根据消息的路由键将消息发送到匹配的队列中,只有订阅了相应主题的消费者才会接收到消息。

主题模型的特点是可以根据消息的主题进行灵活的消息过滤和匹配。这种模型适用于处理复杂的消息路由需求,可以根据多个属性、标签或关键字对消息进行分类和投递。例如,商品订阅系统可以根据用户的兴趣分类推送商品信息。

3.1.4 总结

  1. 点对点模型:适用于一对一的消息传递,具有高可靠性。

  2. 发布/订阅模型:适用于广播消息给多个消费者,实现消息的广播。

  3. 主题模型:适用于根据消息的主题进行灵活的过滤和匹配,处理复杂的消息路由需求。

3.2 kafka 术语

在这里插入图片描述

  1. topic: 发布订阅的对象称为主题
  2. producer:生产者,用于向一个或者多个主题发送消息
  3. consumer Group:消费者组,订阅主题并接收主题的消息,可以同时订阅多个主题
    • consumer:每个消费者属于一个特定的group
  4. broker: 物理概念,一个kafka集群通常由多个broker组成,broker用来接收和处理客户端请求,并对消息进行持久化处理。通常多个broker分布于多个机器用以提高系统可用性。
  5. replication:备份机制
    • replica: 副本,通常情况下被拷贝到不同的机架或者不同的机器
      -Leader replica: 领导者副本,对外提供服务,直接与客户端交互。生产者总是向领导者副本写消息,消费者总是向领导者副本读消息。
    • Follower replica: 跟随者副本,不对外提供服务,领导副本的追随者。只向领导者副本发送消息同步请求,领导者副本将最新消息发送给追随者副本之后,两者保持同步。
  6. partition:分区,一个主题可以划分多个分区。生产者生产的每一条消息只会出现在一个分区中。每个分区配置多个副本,通常一个领导者副本,多个追随者副本。
  7. offset: 又称consumer offset 消费者位移,每个消费者都有自己的位移,保存在broker内部的topic中

3.3 kafka 系统架构

在这里插入图片描述
三层消息架构

  • 主题层:每个主题设置M个分区,每个分区配置N个副本
  • 分区层:每个分区配置N个副本,只有一个领导者副本对外提供服务
  • 消息层:分区中包含多条消息,每条消息从位移0开始依次递增

客户端和服务器端的通信,是基于简单、高性能且与编程语言无关的TCP协议

3.4 kafka 生产者

在这里插入图片描述

kafka生产流程:

  1. 首先创建一个ProducerRecord对象,它需要包含目标主题和要发送的内容,还可以指定键或分区。在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。(生产者的消息先被放到缓存里,然后使用单独的线程发送到服务器)

  2. 接下来,数据被传给分区器,如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息回被发送到相同的主题和分区上。有一个独立的线程负责把这些记录彼此发送到相应的broker上。

  3. 服务器在收到这些消息时会返回一个响应,如果消息成功写入kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则返回一个错误。生趁着在收到错误之后会尝试重新发送消息,几次之后如果还是返回失败,那么就返回错误信息。

3.5 编写生产者客户端

3.5.1 引入pom

  <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.4.1</version>
   </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

3.5.2 生产者代码

public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers","127.0.0.1:9092");
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("acks","all");
        prop.put("retries",0);
        prop.put("batch.size",16384);
        prop.put("linger.ms",1);
        prop.put("buffer.memory",33554432);
        String topic ="test";
        String sendValue = "发送消息";
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),sendValue));
        producer.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

打开消费者客户端监听:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

参数说明:

  1. acks
  • 如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • 如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。

  • 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行(第 5 章将讨论更多的细节)。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

  1. buffer.memory
    设置生产者缓存区大小,生产者用它缓冲要发送到服务器的消息。如果应用发送速度过快,会导致生产者空间不足,这时候send() 要么被阻塞、要么抛出异常,取决于max.block.ms,表示抛出异常之前可以阻塞多久。

  2. compression.type
    默认情况,消息不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

  3. retries
    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

  4. batch.size
    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

  5. linger.ms
    该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)

  6. client.id
    该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里

  7. max.in.flight.requests.per.connection
    该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

  8. imeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

  9. max.block.ms
    该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

  10. max.request.size
    该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

  11. receive.buffer.bytes 和 send.buffer.bytes
    这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

3.5.3 消费者代码

public static void main(String[] args) {
        try {
            Properties prop = new Properties();
            prop.put("bootstrap.servers","127.0.0.1:9092");
            prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("group.id","con-1"); // 消费者群组
            prop.put("auto.offset.reset","latest");
            //自动提交偏移量
            prop.put("auto.commit.intervals.ms","true");
            //自动提交时间
            prop.put("auto.commit.interval.ms","1000");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            ArrayList<String> topics = new ArrayList<>();
            //可以订阅多个消息
            topics.add("test");
            consumer.subscribe(topics);
            while(true){
                ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
                for(ConsumerRecord<String,String> consumerRecord :poll){
                    System.out.println(consumerRecord);
                }
            }
        }catch (Exception e) {
            System.out.println("error:" + e.getMessage());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

打开生产者客户端发送消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

参数说明:

  1. fetch.min.bytes
    该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

  2. fetch.max.wait.ms
    我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入“Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。

  3. max.partition.fetch.bytes
    该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。

  4. session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

  5. auto.offset.reset
    该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。

  6. enable.auto.commit
    我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。

  7. partition.assignment.strategy
    我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。
    8.client.id
    该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

  8. max.poll.records
    该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

  9. receive.buffer.bytes 和 send.buffer.bytes
    socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/614336
推荐阅读
相关标签
  

闽ICP备14008679号