当前位置:   article > 正文

Kafka安装配置及Java中的使用_java kafka

java kafka

目录

一、消息队列

二、流派分类:

三、Kafka基本介绍

四、主题和分区的概念

五、Kafka集群

六、kafka-clients之生产者

七、kafka-clients之消费者

八、SpringBoot使用Kafka

九、Kafka集群中的controller、rebalance、HW

(1)controller

(2)rebalance机制

(3)HW和LEO

十、Kafka问题优化

十一、Kafka Eagle监控平台(新版本待测试)


一、消息队列

消息队列(Message Queue,简称MQ),具体地说,是为了解决通信问题

  1. 同步:顺序执行,存在性能和稳定性的问题;

    • 问题一:系统开销大,响应时间长;

    • 问题二:执行过程中需要保证每个服务的顺利执行,用户体验较差;

  2. 异步:通过消息队列,进行异步处理;

    • 优势一:极大提高系统的吞吐量;

    • 优势二:即使执行失败,也可以使用分布式事务来保证最终是成功的(最终一致性);

二、流派分类:

主流MQ分为以下几个:

  • KafKa:目前性能最好、速度最快;

  • RocketMQ:阿里根据Kafka封装,功能性更强;

  • RabbitMQ:功能性强,模式多;

  • ZeroMQ:看重的是MQ的通信能力,基于Socket封装。

功能性区分:

  1. 有Broker:通常有一台服务器作为Broker,所有消息都通过它中转。生产者将消息发送给Broker,Broker则把消息主动推送给消费者。

    • 重Topic:Kafka、JMS(ActiveMQ),在消息队列中,Topic必须存在。

    • 轻Topic:RabbitMQ(AMQP),Topic只是其中的一种中转模式。

  2. 无Broker:ZeroMQ,认为MQ是一种更高级的Socket,是为了解决通信问题。故ZeroMQ被设计为了一个库,而不是中间件,ZeroMQ做的事情就是封装出了一套类似Socket的API去完成发送、读取消息。

三、Kafka基本介绍

  1. 官网地址:Apache Kafka

  2. 依赖准备:

    • 安装JDK:jdk1.8 +;

    • 安装Zookeeper:自己安装,或者使用Kafka安装包内自带的也可以;

  3. Kafka安装:

    • 官网下载:当前版本为:kafka_2.13-2.8.0.tgz;

    • 解压缩:tar -xzvf kafka_2.13-2.8.0.tgz

    • 目录说明:

      • bin:二进制启动文件;

      • config:相关配置文件;

      • libs:依赖的jar包;

      • licenses:许可证文件;

      • logs:某些日志文件;

      • site-docs:压缩文档参考;

    • 修改配置文件(config目录内):

      • zookeeper.properties:修改日志目录,改为自定义路径;

      • server.properties

        1. # 单节点使用默认值,集群修改为唯一id
        2. broker.id=0
        3. # 监听的kafka服务ip地址
        4. listeners=PLAINTEXT://localhost:9092
        5. # 消息存储日志文件
        6. log.dirs=D://IT/Kafka/tmp/kafka-logs
        7. # 连接的zk服务器ip地址
        8. zookeeper.connect=localhost:2181

    • 启动测试(win10启动):

      • 启动zk服务器:创建脚本启动,命名为 zookeeper.bat,并运行;

        start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties"

      • 启动Kafka服务器:创建脚本启动,命名为 kafka.bat,并运行;

        start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server.properties"

      • 校验是否启动成功:进入zk-Cli,查看是否有kafka的 broker.id 节点;

        ls /brokers/ids

  4. Kafka基本概念:

    名称解释
    Broker消息中间件处理节点,一个Kafka节点就是一个Broker,一个或多个组成一个Kafka集群
    TopicKafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic
    Producer消息生产者,向Broker发送消息的客户端
    Consumer消息消费者,从Broker读取消息的客户端
  5. 创建Topic:

    • 创建Topic:

      ./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo1

    • 修改Topic:

      1. # 修改分区数
      2. ./kafka-topics.bat --zookeeper localhost:2181 -alter --partitions 3 --topic demo1

    • 查询所有的Topic:

      ./kafka-topics.bat --list --zookeeper localhost:2181

    • 查询具体某个Topic详细信息:

      ./kafka-topics.bat --zookeeper localhost:2181 --topic demo1 --describe

  6. 发送消息:

    使用Kafka自带的生产者命令客户端,既可以从本地文件读取内容,也可以在命令行直接输入消息。默认情况下,每一行都是一条独立的消息。发送消息时,需要指定发送到具体哪个Kafka服务器和Topic名称

    ./kafka-console-producer.bat --broker-list localhost:9092 --topic demo1

  7. 消费消息:

    • 从头开始消费:消费全部消息。

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic demo1

    • 从最新的消息开始消费(默认):从最后一条消息的偏移量 + 1开始消费。

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo1

    • 注意:

      • 消息可被存储,且是顺序结构,通过偏移量offset来描述消息的有序性;

      • 消息消费时可以指定偏移量进行描述消费的消息位置;

  8. 消费组:多个消费者可以组成一个消费组。

    • 查询所有的消费组:

      ./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list

    • 查询某个消费组中具体信息:

      ./kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group demoGroup1
      • current-offset:当前消费组已消费偏移量;

      • log-end-offset:消息总量(最后一条消息的偏移量);

      • lag:积压消息总量;

  9. 单播消息:一个生产者,一个消费组(Group)。同一消费组中,只有一个消费者能收到Topic中的消息

    • 消费组:

      • 消费者一:

        ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1

      • 消费者二:

        ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup --topic demo1

  10. 多播消息:一个生产者,多个消费组(Group)。每个消费组中,只有一个消费者能收到Topic中的消息

    • 消费组一 demoGroup1

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup1 --topic demo1

    • 消费组二 demoGroup2

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=demoGroup2 --topic demo1

四、主题和分区的概念

  1. 主题Topic

    • topic是一个逻辑的概念,Kafka通过Topic将消息进行分类,不同的topic将被订阅的消费组进行消费。

    • 当topic中的消息非常多,由于消息会保存在日志中,导致内存占用过大,由此提出分区Partition的概念。

  2. 分区Partition

    通过partition将一个topic中的消息进行分区存储。好处如下:

    • 分区存储,可以解决存储问题过大的问题;

    • 提升了消息日志读写的吞吐量,读和写可以在多个分区中同时进行;

  3. 消息日志文件分析(kafka-logs):

    • 0000000000.log:文件中保存的就是消息内容;

    • __consumer_offsets-x 文件夹:Kafka内部默认会创建50个 __consumer_offsets-x 分区(0-49),目的是为了存放消费者消费某个主题Topic的偏移量。当消费者消费完成后就会把偏移量上报给对应的主题Topic进行保存。目的是为了提升当前的Topic的并发性。

      • 消费完成提交至对应分区的计算方式:hash(消费组ID) % __consumer_offsets分区总数

      • 提交到主题内容的 Key 的方式:消费组ID + topic名称 + __consumer_offsets分区号

      • 提交到主题内容的 Value的方式:当前offset的值

    • 0000000000.index:日志文件的索引;

    • 0000000000.timeindex:日志文件按时间点的索引;

    • 日志文件,默认保存周期为七天,到期后自动删除。

五、Kafka集群

  1. 集群搭建(三个Broker):

    • 创建三个 server.properties 文件:

      • 第一个 server0.properties

        1. broker.id=0
        2. listeners=PLAINTEXT://localhost:9092
        3. log.dirs=D://IT/Kafka/tmp/kafka-logs-0
        4. zookeeper.connect=localhost:2181

      • 第二个 server1.properties

        1. broker.id=1
        2. listeners=PLAINTEXT://localhost:9093
        3. log.dirs=D://IT/Kafka/tmp/kafka-logs-1
        4. zookeeper.connect=localhost:2181

      • 第三个 server2.properties

        1. broker.id=2
        2. listeners=PLAINTEXT://localhost:9094
        3. log.dirs=D://IT/Kafka/tmp/kafka-logs-2
        4. zookeeper.connect=localhost:2181

    • 启动三个Broker:

      1. start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server0.properties"
      2. start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server1.properties"
      3. start cmd /k "cd D:\IT\Kafka\kafka_2.13-2.8.0&&bin\windows\kafka-server-start.bat .\config\server2.properties"

    • 使用zkCli客户端检测是否启动成功:

      ls /brokers/ids		# 启动成功显示三个broker的id

  2. 副本(replication-factor)的概念:

    • 副本:是为主题中的分区创建的备份,一般来说,有几个Broker就应该创建几个副本。其中,会选举出一个副本作为 leader,其他的则是 follower

      • leader:Kafka的读写操作,都发生在leader上。leader负责把数据同步至其他的follower,当leader宕机时,将会进行主从选举,选出一个新的leader。

      • follower:介绍leader的同步数据;

      • isr:可以同步和已同步的节点会存入 ISR 集合中,主从选举从 ISR 集合内选出leader。当节点的性能较差时,ISR 集合将会踢出该节点。

  3. 集群消费问题:

    • 向集群发送消息:

      ./kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic demo1

    • 从集群中消费消息:

      ./kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=demoGroup --topic demo1

    • 分区消费的细节:

      • 一个分区partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个分区partition的多个消费者的顺序性无法保证(后续有方法可以保证)。

      • 分区partition的数量决定了消费组中的消费者数量,建议消费组中的消费者数量不要超过分区partition的数量,防止多出的消费者消费不到消息造成资源的浪费。

      • 消费者宕机时,将会触发 rebalance 机制,选出其他消费者进行消费该分区的消息。

六、kafka-clients之生产者

  1. 引入依赖(版本号与Kafka版本一致):

    1. <!--kafka-clients-->
    2. <dependency>
    3. <groupId>org.apache.kafka</groupId>
    4. <artifactId>kafka-clients</artifactId>
    5. <version>2.8.0</version>
    6. </dependency>

  2. 简单测试:

    • 创建一个Topic:

      ./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-test-topic

    • 启动三台Broker作为集群

    • 代码测试:

      1. public class ProducerTest {
      2. private static final String TOPIC_NAME = "my-test-topic";
      3. private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
      4. private static final String KEY = "test:";
      5. private static final String VALUE = "This is a test for Kafka-producer!";
      6. private static final Properties prop = new Properties();
      7. static {
      8. // Kafka集群IP
      9. prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
      10. // ACK:消息持久化配置(-1、0、1)
      11. prop.put(ProducerConfig.ACKS_CONFIG, "1");
      12. // 重试次数配置
      13. prop.put(ProducerConfig.RETRIES_CONFIG, "3");
      14. // 重试间隔配置,单位:ms
      15. prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
      16. // 缓冲区大小,32Mb
      17. prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);
      18. // 发送消息大小,16Kb
      19. prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);
      20. // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
      21. prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
      22. // key的序列化方式
      23. prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      24. // value的序列化方式
      25. prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      26. }
      27. @Test
      28. public void test1() throws Exception {
      29. // 1.创建消息生产者对象
      30. Producer<String, String> producer = new KafkaProducer<>(prop);
      31. for (int i = 0; i < 6; i++) {
      32. // 2.封装消息载体对象
      33. // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
      34. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
      35. // 自定义发送的分区位置
      36. // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
      37. // 3.同步发送消息
      38. Future<RecordMetadata> future = producer.send(record);
      39. // 4.获取消息发送成功后的元数据
      40. RecordMetadata metadata = future.get();
      41. System.out.println("主题Topic名称:" + metadata.topic());
      42. System.out.println("保存partition分区的位置:" + metadata.partition());
      43. System.out.println("总消息数(offset偏移量):" + metadata.offset());
      44. }
      45. producer.close();
      46. }
      47. @Test
      48. public void test2() {
      49. // 1.创建消息生产者对象
      50. Producer<String, String> producer = new KafkaProducer<>(prop);
      51. // 2.封装消息载体对象
      52. // 分区计算方式:hash(key) % partitionNum,即:key的哈希值 取余 分区的总数
      53. ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, KEY + UUID.randomUUID(), VALUE);
      54. // 自定义发送的分区位置
      55. // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2, KEY + UUID.randomUUID(), VALUE);
      56. // 3.异步发送消息
      57. producer.send(record, (data, exception) -> {
      58. // 4.消息发送异常,要做的事
      59. if (exception != null) {
      60. System.out.println("发送失败,异常原因:" + exception.getMessage());
      61. }
      62. // 5.消息发送成功要做的事
      63. if (data != null) {
      64. System.out.println("主题Topic名称:" + data.topic());
      65. System.out.println("保存partition分区的位置:" + data.partition());
      66. System.out.println("总消息数(offset偏移量):" + data.offset());
      67. }
      68. });
      69. // 延时等待,异步消息进程执行结束
      70. ThreadUtils.sleep(1000);
      71. producer.close();
      72. }
      73. }
      • 同步发送消息:如果发送消息后没有收到 ack,生产者将阻塞等待 3 秒,之后会进行重试,重试三次后仍然失败,则抛出异常。执行慢,但是不会丢失消息;

      • 异步发送消息:生产者发送完成后就可以执行后续业务,broker收到消息后,进行异步调用生产者提供的 callback 回调方法。当网络异常时可能出现回调方法未执行的问题,即消息丢失;

  3. 消息持久化机制参数(ACK配置,同步发送时用到 ACK 配置)

    • acks=0:发送消息后无需等待broker进行确认,就能发送下一条。性能最高,消息最容易丢失;

    • acks=1(默认值):至少需要等待 leader 成功写入本地日志,才能发送下一条。如果此时 leader 宕机,则会发生消息丢失;

    • acks=-1/all:需要等待多个节点写入日志(在 min.insync.replicas 中进行设置,默认值为1,推荐大于等于 2),才能发送下一条。只要有一个备份存活就不会丢失消息。金融级别的常用配置,性能最差,安全性最高;

    • 其他配置:

      • 重试次数配置

      • 重试间隔配置

    • 配置相关代码:

      1. // ACK:消息持久化配置(-1、0、1)
      2. prop.put(ProducerConfig.ACKS_CONFIG, "1");
      3. // 重试次数配置
      4. prop.put(ProducerConfig.RETRIES_CONFIG, "3");
      5. // 重试间隔配置,单位:ms
      6. prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");

  4. 消息缓冲区配置:

    • Kafka会创建一个消息缓冲区,存放要发送的消息,缓冲区大小为 32Mb;

      1. // 缓冲区大小,32Mb
      2. prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 32);

    • Kafka本地线程会去缓冲区拉取数据,发送至 Broker,一次拉取 16Kb;

      1. // 发送消息大小 16Kb
      2. prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 16);

    • 如果线程拉取不到 16Kb 的数据,间隔 10ms 后,也会将拉取的数据发送至Broker;

      1. // 未能拉取到16Kb数据,间隔 10ms 后,立即发送
      2. prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);

七、kafka-clients之消费者

  1. 引入依赖(版本号与Kafka版本一致):

    1. <!--kafka-clients-->
    2. <dependency>
    3. <groupId>org.apache.kafka</groupId>
    4. <artifactId>kafka-clients</artifactId>
    5. <version>2.8.0</version>
    6. </dependency>

  2. 简单测试:

    1. public class ConsumerTest {
    2. private static final String TOPIC_NAME = "my-test-topic";
    3. private static final String BOOTSTRAP_SERVER = "localhost:9092,localhost:9093,localhost:9094";
    4. private static final String CONSUMER_GROUP_NAME = "test-group-0";
    5. private static final Properties prop = new Properties();
    6. static {
    7. // Kafka集群IP
    8. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
    9. // 消费组的名称
    10. prop.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    11. // key的反序列化方式
    12. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    13. // value的反序列化方式
    14. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    15. }
    16. @Test
    17. public void test1() {
    18. // 1.创建消费者对象
    19. Consumer<String, String> consumer = new KafkaConsumer<>(prop);
    20. // 2.订阅Topic主题的消息列表
    21. consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    22. // 3.长轮询进行监听消息
    23. while (true) {
    24. // 4.拉取消息列表
    25. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    26. // 5.循环处理每一条消息
    27. for (ConsumerRecord<String, String> record : records) {
    28. System.out.println("主题Topic名称:" + record.topic());
    29. System.out.println("分区partition的位置:" + record.partition());
    30. System.out.println("消息offset偏移量:" + record.offset());
    31. System.out.println("消息的键key:" + record.key());
    32. System.out.println("消息的值value:" + record.value());
    33. }
    34. }
    35. }
    36. }

  3. 消费者提交 offset:

    • 提交的内容:所属的消费组 + 消费的 Topic + 消费的 partition + 偏移量 offset;

    • 自动提交:消息 poll 之后,消费消息之前进行 offset 提交。当消费者宕机时,会发生消息丢失问题;

      1. // 是否开启自动提交 offset,默认为 true
      2. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      3. // 自动提交 间隔时间
      4. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

    • 手动提交:消息进行消费后,手动提交 offset;

      1. // 是否开启自动提交 offset,默认为 true
      2. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
      • 同步提交(更推荐使用):

        1. // 拉取消息列表
        2. Duration delay = Duration.ofMillis(1000);
        3. ConsumerRecords<String, String> records = consumer.poll(delay);
        4. if (records.count() > 0) {
        5. // 6.手动同步提交,等待Broker返回ACK才会执行后续业务,否则阻塞等待
        6. try {
        7. consumer.commitAsync();
        8. } catch (Exception e) {
        9. // 提交失败执行的逻辑
        10. e.printStackTrace();
        11. }
        12. }

      • 异步提交:

        1. // 拉取消息列表
        2. Duration delay = Duration.ofMillis(1000);
        3. ConsumerRecords<String, String> records = consumer.poll(delay);
        4. if (records.count() > 0) {
        5. // 6.手动异步提交,无需返回ACK,提交成功后异步调用回调方法
        6. consumer.commitAsync((offset, exception) -> {
        7. // 提交失败执行的逻辑
        8. if (exception != null) {
        9. System.out.println("手动提交异常:" + exception.getMessage());
        10. }
        11. // 提交成功执行的逻辑
        12. if (CollUtil.isNotEmpty(offset)) {
        13. offset.entrySet().forEach(System.out::println);
        14. }
        15. });
        16. }

  4. 消费者长轮询 poll 的配置:

    • 单次拉取条数:默认500条;

      1. // 单次拉取的消息条数
      2. prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

    • 单次轮询的时间:poll方法的入参,此处为 1000 毫秒;

      1. Duration delay = Duration.ofMillis(1000);
      2. ConsumerRecords<String, String> records = consumer.poll(delay);
      • poll的机制:在轮询内时,重复拉取消息,直至拉取 500 条消息,或超过轮询时长停止;

        • 当首次拉取消息,够了 500 条,则停止拉取,执行后续业务逻辑;

        • 首次没有拉取到 500 条,进行重复拉取,直至 拉取够,或超过轮询时长停止;

        • 若多次拉取都不够 500 条,且超过了轮询时长,则停止拉取,执行后续业务逻辑;

      • 当前后两次轮询的间隔时间超过了 30 秒,集群将判定此消费者能力弱,并踢出消费组,并处罚 rebalance 机制, rebalance 机制会造成性能开销。可以修改如下配置,提升消费者的速度;

        1. // 单次拉取的消息条数,默认500
        2. prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
        3. // 单次长轮询的间隔时间,默认 1000 ms
        4. prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5 * 1000);

  5. 消费者的健康状态检测配置:

    1. // 消费者发送心跳间隔时长,心跳频率
    2. prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
    3. // 心跳超时将触发 rebalance机制,并踢出消费组 的超时时长
    4. prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

  6. 消费组指定partition分区进行消费:

    1. // 参数一,Topic名称。参数二,分区位置
    2. TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
    3. consumer.assign(Collections.singletonList(topicPartition));

  7. 消费者的消息回溯:指定分区位置,并从头开始消费;

    1. // 参数一,Topic名称。参数二,分区位置
    2. TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
    3. consumer.assign(Collections.singletonList(topicPartition));
    4. consumer.seekToBeginning(Collections.singletonList(topicPartition));

  8. 指定 offset 位置消费:

    1. // 指定offset位置进行消费
    2. consumer.seek(topicPartition, new OffsetAndMetadata(10));

  9. 指定时间点,获取 offset 进行消费:

    1. // 获取此Topic的全部分区
    2. List<PartitionInfo> infos = consumer.partitionsFor(TOPIC_NAME);
    3. // 消费当前时间前一天的消息
    4. long time = new Date().getTime() - 24 * 60 * 60 * 1000;
    5. // 封装分区和消费时间的参数
    6. HashMap<TopicPartition, Long> map = new HashMap<>();
    7. for (PartitionInfo info : infos) {
    8. map.put(new TopicPartition(TOPIC_NAME, info.partition()), time);
    9. }
    10. // 根据时间节点,找到消息偏移量 offset
    11. Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
    12. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
    13. TopicPartition key = entry.getKey();
    14. OffsetAndTimestamp value = entry.getValue();
    15. // 进行消息订阅,指定消费的偏移量 offset
    16. consumer.assign(Collections.singletonList(key));
    17. consumer.seek(key, new OffsetAndMetadata(value.offset()));
    18. }

  10. 新消费组的消费 offset 规则:

    当创建新的消费组启动消费者时,默认只会消费最新的消息。通过修改以下配置,使消费者首次启动时,从头开始消费,后续启动从最新消息开始消费。

    1. // 创建新的消费组时的消费规则,默认latest。latest:从最新开始 / earliest:从头开始
    2. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    • latest:从最新开始,只消费最新的消息;

    • earliest:首次启动从头开始,后续启动从最新开始;

    • 区别: seekToBeginning() 方法是每次启动都从头开始消费,earliest 只有首次启动从头开始,后续启动从最新开始;

八、SpringBoot使用Kafka

  1. 引入依赖:

    1. <dependency>
    2. <groupId>org.springframework.kafka</groupId>
    3. <artifactId>spring-kafka</artifactId>
    4. </dependency>

  2. 配置文件:

    1. spring:
    2. kafka:
    3. bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    4. producer:
    5. acks: 1 # 0:无需ack确认;1:broker写入 leader日志后返回ack;-1:broker写入多个副本日志后返回ack
    6. retries: 3 # 重试次数
    7. batch-size: 16384 # 单次发送消息大小 16Kb
    8. buffer-memory: 33554432 # 缓存区大小 32Mb
    9. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    10. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    11. consumer:
    12. group-id: test-group-0
    13. max-poll-records: 500 # 单次拉取消息条数
    14. enable-auto-commit: false # 消费后是否自动提交
    15. auto-offset-reset: earliest # 新消费组消费策略,earliest:首次从头消费,后续从最新消息消费
    16. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    17. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    18. listener:
    19. ack-mode: manual # 手动提交,监听器处理一次轮询的消息后(默认500条)调用ack.acknowledge()进行提交
    20. # ack-mode: manual_immediate # 手动提交,监听器处理单条消息后调用ack.acknowledge()进行提交

  3. 消息生产者:

    1. @RestController
    2. @RequestMapping("/kafka")
    3. public class KafkaController {
    4. private static final String TOPIC_NAME = "my-test-topic";
    5. private static final String KEY = "test:";
    6. private static final String VALUE = "This is a test for Kafka-producer!";
    7. @Autowired
    8. private KafkaTemplate<String, String> kafkaTemplate;
    9. @GetMapping("/send")
    10. public AjaxResult sendMessage() {
    11. ProducerRecord<String, String> record =
    12. new ProducerRecord<>(TOPIC_NAME,
    13. KEY + UUID.randomUUID(),
    14. VALUE);
    15. // 返回值是Future
    16. kafkaTemplate.send(record);
    17. return AjaxResult.success();
    18. }
    19. }

  4. 消费者监听:

    1. @Component
    2. public class MyConsumer {
    3. private static final String TOPIC_NAME = "my-test-topic";
    4. private static final String CONSUMER_GROUP_NAME_0 = "test-group-0";
    5. /**
    6. * 自动监听是否有消息
    7. *
    8. * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
    9. * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交
    10. */
    11. @KafkaListener(topics = TOPIC_NAME, groupId = GROUP_NAME)
    12. public void listenMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    13. System.out.println(record);
    14. ack.acknowledge();
    15. }
    16. }

  5. 消费者配置主题Topic、分区partition、偏移量offset:

    1. /**
    2. * 创建一个消费组,包含 3 个消费者(concurrency),持续监听两个Topic的消息。
    3. * 第一个Topic,监听分区(0、1),分区 0的偏移量初始为 5;分区 1的偏移量初始为 10。
    4. * 第二个Topic,监听分区(1、2),分区 1的偏移量初始为 15;分区 2的偏移量初始为 20。
    5. *
    6. * @param record 可以是 ConsumerRecords,也可以是 ConsumerRecord,前者一次处理多条消息,后者一次处理一条消息
    7. * @param ack 当关闭自动提交时,需要使用此参数进行 ack 确认提交
    8. */
    9. @KafkaListener(groupId = GROUP_NAME, concurrency = "3", topicPartitions = {
    10. @TopicPartition(topic = TOPIC_NAME_1, partitions = {"0", "1"}, partitionOffsets = {
    11. @PartitionOffset(partition = "0", initialOffset = "5"),
    12. @PartitionOffset(partition = "1", initialOffset = "10")
    13. }),
    14. @TopicPartition(topic = TOPIC_NAME_2, partitions = {"1", "2"}, partitionOffsets = {
    15. @PartitionOffset(partition = "1", initialOffset = "15"),
    16. @PartitionOffset(partition = "2", initialOffset = "20")
    17. })
    18. })
    19. public void listenMessage2(ConsumerRecord<String, String> record, Acknowledgment ack) {
    20. System.out.println(record);
    21. ack.acknowledge();
    22. }

九、Kafka集群中的controller、rebalance、HW

(1)controller
  1. 选举机制:使用zk的机制,当 broker 创建时,会在 zookeeper 中创建一个临时序号节点,序号最小的节点代表的 broker 将作为集群中的 controller。

  2. 作用

    • 当集群中某一个副本的 leader 宕机,需要在集群中选出一个新的 leader,选举的规则是 ISR 集合中最左边的元素(ISR 集合会按照性能排序,性能越好越靠前);

    • 当集群中有 broker 的增加或减少,controller 会同步信息给其他的 broker;

    • 当集群中有 partition 分区的增加或减少,controller 会同步信息给其他的 broker;

(2)rebalance机制
  1. rebalance 的前提:当消费组中的消费者没有指定分区进行消费,由 Kafka 决定消息分区的分配。

  2. 触发 rebalance 的条件:当消费组中的消费者和分区的关系发生变化时;

  3. 分区分配策略(rebalance之前,分区分配有三种策略):

    • range:根据公式计算消费者消费的分区。

    • 轮询:分区逐一分配到消费者上。

    • sticky:粘合策略。如果需要rebalance,会在已分配的基础上进行调整,而不会影响之前的分配情况。建议开启此策略,因为 rebalance 机制会重新分配会造成资源浪费;

(3)HW和LEO
  1. HW:Height Water,称为高水位。它是 Broker 已完成同步的分界线,当消息写入 Broker,并同步到所有副本之后,HW才会变化。HW变化之前,消费者无法消费到未同步完成的消息,这么做的目的是为了防止消息丢失

  2. LEO:Log End Offset,指的是某个副本的最后一条消息的位置。

  3. 关系图如下:

十、Kafka问题优化

  1. 如何防止消息丢失?

    • 生产者:

      • 发送消息时,使用同步发送的方式;

      • 设置 ACK 的级别,设为1 或 -1即可,-1时可以做到99.99%防丢失率,需要修改 min.insync.replicas 分区备份数,推荐大于等于 2;

    • 消费者:拉取消息后的自动提交,修改为手动提交;

  2. 如何防止重复消费?

    • 生产者:

      • 关闭 retry 重试:不推荐,这样做可能导致消息丢失;

    • 消费者:

      • 开启自动提交:不推荐,这样做可能导致消息丢失;

      • 保证消费的幂等性(指多次访问结果一致):

        • 方式一:使用 redis / zk 分布式锁,主流方案,推荐使用;

        • 方式二:创建联合主键,保证消息的唯一性,防止插入多条记录;

  3. 如何做到顺序消费?

    • 生产者:

      • 使用同步发送,且 ACK 不为 0(0会导致消息丢失),确保消息发送顺序是正确的;

    • 消费者:

      • 主题 Topic 只能设置一个分区 Partition,且消费组只能有一个消费者;

    • Kafka的顺序消费会牺牲性能,可以考虑使用 RocketMQ 代替 ;

  4. 如何处理消息积压?

    • 消息积压的原因:由于消费的速度远赶不上生产的速度,导致

    • 解决方案:

      • 消费者使用多线程,充分利用机器的性能;

      • 优化业务架构,提升业务层的消费速度;

      • 创建多个消费组,多个消费者,提升消费者的速度;

      • 消息分发:创建一个消费者,进行转发消息,接收方为新的 Topic ,且配置了多个分区及多个消费者进行消费(不常用)。

  5. 如何实现延时队列?

    • 场景:创建订单后,如果 30 分钟未支付,则取消订单。

    • 解决方案:

      • 单独创建相应的主题;

      • 消费者消费该主题的消息(轮询);

      • 消费前进行判断,当前时间是否与消息的创建时间相差 30 分钟,且未支付;

        • 如果是:修改数据库状态为取消订单;

        • 如果否:记录当前 offset,且不再继续消费之后的消息。等待一分钟后,根据记录的 offset 拉取消息,继续进行判断。

十一、Kafka Eagle监控平台(新版本待测试)

未完待续...

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号