当前位置:   article > 正文

Kafka快速复习_kafka复习

kafka复习

Kafka快速复习

1.相关概念

Kafka传 统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义 : Kafka是 一个开源的 分 布式事件流平台 (Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
消息队列 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

2.消息队列的应用场景

传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
在这里插入图片描述
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

3.消息队列的两种模式

1)点对点模式

• 消费者主动拉取数据,消息收到后清除消息
在这里插入图片描述

2)发布/订阅模式

可以有多个topic主题(浏览、点赞、收藏、评论等)
消费者消费数据之后,不删除数据
每个消费者相互独立,都可以消费到数据
在这里插入图片描述

4 Kafka 基础架构

在这里插入图片描述
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
一台 Kafka 服务器就是一个 broker。
一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader (主)和若干个Follower(从主发生故障时变为主)。

二.Kafka常用命令

1.主题命令行操作

参数描述
–bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic>操作的 topic 名称。
–create创建主题。
–delete删除主题。
–alter修改主题。
–list查看所有主题。
–describe查看主题详细描述。
–partitions <Integer: # of partitions>设置分区数。
–replication-factor<Integer: replication factor>设置分区副本。
–config <String: name=value>更新系统默认的配置。
1.查看操作主题命令参数
 bin/kafka-topics.sh
  • 1
2.查看当前服务器中的所有 topic
 bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
  • 1
3.创建 first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 
3 --topic first
  • 1
  • 2

–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数

4.查看 first 主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
  • 1

修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
  • 1

删除 topic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
  • 1

2 生产者命令行操作

查看操作生产者命令参数
 bin/kafka-console-producer.sh
  • 1
参数描述
–bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic>操作的 topic 名称。
发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
  • 1

3.消费者命令行操作

参数描述
–bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic>操作的 topic 名称。
–from-beginning从头开始消费。
–group <String: consumer group id>指定消费者组名称。

消费消息

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
  • 1
  • 2

三Kafka 生产者

1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
在这里插入图片描述

2.生产者重要参数列表

在这里插入图片描述

3 异步发送 API

package com.example.kafka2;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import java.util.Properties;

/**
 * @Author: lx
 * @CreateTime: 2023-03-06  11:40
 * @Description: TODO
 */
public class ProducerConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
//        设置 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092");
//        设置 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//        1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//        2.发送数据
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<String, String>("second", "my-key", "my-value"));
        }
//        3.关闭资源
        producer.close();
    }
    /**
     * @description: 异步有回调函数
     * @author: lmk
     * @date: 2023/3/6 13:32
     * @param: []
     * @return: void
     **/
    @Test
    public void test() {
        Properties properties = new Properties();
//        设置 bootstrap.servers

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092");
//        设置 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 根据传递的value 使用自定义分区
       // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.kafka2.MyPartitioner");
//        1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//        2.发送数据
        for (int i = 0; i < 5; i++) {
            //
            //同步发送 producer.send().get();
            producer.send(new ProducerRecord<String, String>("first", "my-key", "my-value"), new Callback() {
                /**
                 * @date: 2023/3/6 13:34
                 * @param: [recordMetadata 元数据信息, e 为 null,说明消息发送成功]
                 * @return: void
                 **/
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success");
                    }else {
                        System.out.println("failure"+e);
                    }
                    //first 0:元数据信息
                    System.out.println(recordMetadata.topic()+" "+recordMetadata.partition()+":元数据信息");
                }
            });
        }
//        3.关闭资源
        producer.close();
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

在这里插入图片描述

消息发送失败会自动重试,不需要我们在回调函数中手动重试。

4 生产者分区

好处
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度
生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
生产者发送消息的分区策略

(1)指明partition的情况下,直接将指明的值作为partition值;
kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {
  • 1
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
 producer.send(new ProducerRecord<String, String>("first", "my-key", "my-value"),
  • 1
(3)既没有partition值又没有key值的情况下,随机选分区,分区或时间到满选下一个

5.自定义分区器

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.lx.kafka.producer.MyPartitioner");
  • 1
  • 2
package com.example.kafka2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @Author: lx
 * @CreateTime: 2023-03-06  13:58
 * @Description: TODO 自定义kafka分区 使其发送到指定分区
 */
public class MyPartitioner implements Partitioner {
    /**
     * @description:
     * @author: lx
     * @date: 2023/3/6 13:59
     * @param: [s topic, o key, keybytes, o1 value, value bytes1, cluster]
     * @return: int  返回几号分区
     **/
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //获取数据
        String value = o.toString();
        if (value.contains("lx_yyds")) {
            return  0;
        }else if(value.contains("lx_123")){
            return 1;
        }else {
            return 2;
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

6 生产者如何提高吞吐量

• batch.size:批次大小,默认16k
• linger.ms:等待时间,修改为5-100ms
• compression.type:压缩snappy
• RecordAccumulator:缓冲区大小,修改为64m

// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 // batch.size:批次大小,默认 16K
 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 // linger.ms:等待时间,默认 0
 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
 // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
7 生数据可靠性

ack 应答
讨论 leader挂 follower挂(follow一直同步不上)
分区副本 即leader和follow 所以至少有俩副本
在这里插入图片描述
在这里插入图片描述

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

代码配置
// 设置 acks
 properties.put(ProducerConfig.ACKS_CONFIG, "all");
 // 重试次数 retries,默认是 int 最大值,2147483647
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  • 1
  • 2
  • 3
  • 4

8.数据去重

至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,保证数据不丢失
最多一次(At Most Once)= ACK级别设置为0 保证数据不重复
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条。
保证的是在单分区单会话内不重复
PID是Kafka每次重启都会分配一个新的且自增
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
使用幂等性

开启参数 enable.idempotence 默认为 truefalse 关闭
  • 1

9.生产者事务

开启事务,必须开启幂等性。
在这里插入图片描述

package com.example.kafka2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @Author: lx
 * @CreateTime: 2023-03-28  15:56
 * @Description: TODO
 */
public class CustomProducerTransactions {
    public static void main(String[] args) throws
            InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                "transaction_id_0");
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        try {
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                // 发送消息
                kafkaProducer.send(new ProducerRecord<>("first",
                        "atguigu " + i));
            }
// int i = 1 / 0;
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

10.数据顺序序

在这里插入图片描述
在这里插入图片描述

四.Kafka 消费者

1 Kafka 消费方式

在这里插入图片描述

2 Kafka 消费者工作流程

在这里插入图片描述

Consumer Group(CG):

消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
即消费者组是逻辑上的一个订阅者
在这里插入图片描述
在这里插入图片描述

3 消费者 API

在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

消费者重要参数
参数名称描述
bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset没有初始偏移量时 earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
简单代码
package com.example.kafka2;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @Author: lx
 * @CreateTime: 2023-03-06  15:50
 * @Description: TODO
 */
public class CustomerConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
//        连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092");
//        反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//        配置消费者 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//  1.创建消费者
        KafkaConsumer<String, String> co = new KafkaConsumer<>(properties);
//  2.订阅主题
        List<String> topics =new ArrayList<>();
        // 定义指定分区
//        List<TopicPartition> topics =new ArrayList<>();
//        topics.add(new TopicPartition("second",1));
//        co.assign(topics);
        topics.add("first");
        co.subscribe(topics);
//  3.消费数据
//        while(true){
//            1秒钟拉取的数据
            ConsumerRecords<String, String> poll = co.poll(Duration.ofSeconds(1000));
            poll.forEach(System.out::println);
//        }
//
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
消费某个分区
// 消费某个主题的某个分区数据
 ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
 topicPartitions.add(new TopicPartition("first", 0));
 kafkaConsumer.assign(topicPartitions);
  • 1
  • 2
  • 3
  • 4

4.分区的分配以及再平衡

一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据?
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
在这里插入图片描述

Range 以及再平衡

假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
partitions/consumer 有余数前几个消费者会多消费。
注:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic。
分区数可以增加,但是不能减少。

原理
当一N个消费者挂掉 则会被踢出且其数据会平均分配到其它消费者里面

RoundRobin 以及再平衡

原理
先是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
  • 1
  • 2

当一N个消费者挂掉,则会重新按照RoundRobin策略进行消费

Sticky 以及再平衡

在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
  • 1
  • 2
  • 3
  • 4

N个消费者的任务会按照粘性规则,尽可能均衡的随机分成N分给其它消费者消费。

.5 offset 位移

offset的默认位置

在这里插入图片描述
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

自动提交 offset

enable.auto.commit:是否开启自动提交offset功能,默认是true
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
在这里插入图片描述

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 // 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  • 1
  • 2
  • 3
  • 4

不建议使用。
手动提交
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

// 同步提交 offset
 consumer.commitSync();
 // 异步提交 offset
 consumer.commitAsync();
  • 1
  • 2
  • 3
  • 4

异步方式使用较多。

任意指定 offset 位置开始消费

(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

Set<TopicPartition> assignment= new HashSet<>();
 while (assignment.size() == 0) {
 kafkaConsumer.poll(Duration.ofSeconds(1));
 // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
 assignment = kafkaConsumer.assignment();
 }
 // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
 for (TopicPartition tp: assignment) {
 kafkaConsumer.seek(tp, 1700);
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
指定时间消费
Set<TopicPartition> assignment = new HashSet<>();
 while (assignment.size() == 0) {
 kafkaConsumer.poll(Duration.ofSeconds(1));
 // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
 assignment = kafkaConsumer.assignment();
 }
 HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
 // 封装集合存储,每个分区对应一天前的数据
 for (TopicPartition topicPartition : assignment) {
 timestampToSearch.put(topicPartition, 
System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
 }
 // 获取从 1 天前开始消费的每个分区的 offset
 Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
 // 遍历每个分区,对每个分区设置消费时间。
 for (TopicPartition topicPartition : assignment) {
 OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
 // 根据时间指定开始消费的位置
 if (offsetAndTimestamp != null){
 kafkaConsumer.seek(topicPartition, 
offsetAndTimestamp.offset());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。 即消费了时间没到还没提交就挂了。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
在这里插入图片描述

6 消费者事务

在这里插入图片描述

7 数据积压

在这里插入图片描述
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值
(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

五.SpringBoot集成kafka

SpringBoot可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。

1.环境配置

maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>top.remained</groupId>
    <artifactId>kafka3</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka3</name>
    <description>kafka3</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <repositories>
        <repository>
            <id>aliyun-releases</id>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
application.yml
# 应用服务 WEB 访问端口
server.port=8080
# 应用名称
spring.application.name=springboot_kafka
#springboot 生产者
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定 key 和 value 的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=group
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.SpringBoot 做生产者

从浏览器端接收到数据并发送给消费者

@RestController
public class ProducerController {
    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;
//  从浏览器中获取数据并发送到消费者
    @RequestMapping("/producer")
    public String getProducer(String producer) throws ExecutionException, InterruptedException {

        ListenableFuture<SendResult<String, String>> first = kafkaTemplate.send("first", producer);
        System.out.println(first.get());
        return "OK";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3.SpringBoot做消费者

@Configuration
public class KafkaConsumer {
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg){
        System.out.println(msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

说明
本文参考尚硅谷的文档,有兴趣的可以去观看(挺不错的)。
随谈
管他那么多干嘛,上天安排的最大。
最近明白了一个问题,特爱走小路的我发现走小路有一个极端问题,不仅路不好走而且小路很多时候就会走着走着没路了。有大路时,别人劝你走大路时,千万要““猥琐发育,别浪””,因为你走小路想回头却已迷失了方向。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/94526
推荐阅读
相关标签
  

闽ICP备14008679号