当前位置:   article > 正文

Springboot整合kafka简单使用_springboot kafka

springboot kafka

kafka

一,介绍

Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发并贡献给 Apache 软件基金会。它设计用于构建高性能、持久性、可伸缩和容错的实时数据管道和流处理应用程序。

以下是 Kafka 的一些关键特点和概念:

  1. 发布-订阅模型:Kafka 使用发布-订阅模型,其中消息生产者将消息发布到主题(topic),而消息消费者从主题订阅消息。这种模型可以支持多个消费者同时消费消息,并且具有良好的扩展性。
  2. 主题(Topic):主题是消息的逻辑分类,相当于一个消息队列。消息被发布到主题,并且消费者从主题订阅消息。
  3. 分区(Partition):每个主题可以划分为一个或多个分区,每个分区是一个有序的日志。分区可以分布在集群的不同节点上,以提供水平扩展和负载均衡。
  4. 副本(Replication):Kafka 支持将每个分区的数据复制到多个副本,以提供容错性和数据可靠性。副本位于集群的不同节点上,确保即使某个节点故障,数据仍然可用。
  5. 生产者(Producer):生产者负责将消息发布到主题。它们可以选择将消息发送到特定的分区,也可以根据负载均衡策略将消息均匀分布到不同的分区中。
  6. 消费者(Consumer):消费者从主题订阅消息,并按照其偏移量(offset)顺序消费消息。消费者组(Consumer Group)是一组共享相同主题的消费者,它们协调以确保每个分区的消息只有一个消费者消费。
  7. ZooKeeper:Kafka 使用 ZooKeeper 来进行分布式协调和管理,如协调生产者和消费者、维护集群元数据等。

Kafka 提供了高性能、可靠、持久的消息传递系统,适用于大规模的实时数据处理和流式处理应用程序。它已经成为许多企业构建实时数据管道和流处理应用程序的首选工具之一。

二,运行原理

Kafka 的运行原理涉及多个组件和过程,主要包括生产者发送消息、消息存储在代理 (Broker) 中的分区中、消费者从分区中读取消息等。以下是 Kafka 的基本运行原理:

  1. 生产者发送消息:
    • 生产者将消息发送到 Kafka 的特定主题 (Topic) 中。
    • 生产者可以选择性地将消息发送到特定的分区 (Partition),或者使用 Kafka 的默认分区分配策略,由 Kafka 在发送时决定将消息发送到哪个分区。
  2. 消息存储在分区中:
    • 主题可以被分成多个分区,每个分区是一个有序的消息序列。
    • 每条消息在分区内有一个唯一的偏移量 (Offset),用于标识消息在分区中的位置。
    • 消息被持久化在 Kafka 的分区中,直到满足一定的保留策略(如时间或者大小限制)。
  3. 消费者从分区中读取消息:
    • 消费者从 Kafka 的特定主题中读取消息。
    • 消费者可以以不同的方式订阅主题,例如:
      • 指定订阅的主题和分区。
      • 加入一个消费者组 (Consumer Group),使得消费者可以以并行的方式消费主题中的消息。
    • 每个消费者在消费主题时,会维护自己的消费偏移量 (Offset),用于记录已经消费的消息位置。
  4. 分区和副本管理:
    • Kafka 使用分区来实现并行处理和水平扩展。
    • 每个分区可以有多个副本,其中一个是领导者副本 (Leader),其余的是追随者副本 (Follower)。
    • 领导者副本负责处理读写请求,追随者副本用于备份数据。
    • 如果领导者副本失效,Kafka 会从追随者副本中选举新的领导者。
  5. ZooKeeper 协调:
    • Kafka 集群依赖 ZooKeeper 来进行集群管理、元数据存储和领导者选举等任务。
    • ZooKeeper 存储了 Kafka 集群的元数据,包括主题、分区、消费者组等信息,同时也用于监控和管理集群的健康状态。

通过这些组件和过程,Kafka 实现了高吞吐量、持久性、分布式和水平扩展等特性,使得它成为处理大规模实时数据流的理想选择。

三,Spring Boot 项目中整合 Kafka (简单使用)

1.添加 Maven 依赖

首先,在你的 Spring Boot 项目的 pom.xml 文件中添加 Spring Kafka 的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
2.配置 Kafka 连接信息

application.propertiesapplication.yml 中添加 Kafka 服务器的连接信息:

  kafka:
    bootstrap-servers: 192.168.193.131:9092
    producer:  #生产者序列化器
      retries: 10 #如果发生故障,生产者将尝试重新发送消息的次数。
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生产者消息键的类。
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化生产者消息值的类。
      ack-mode: manual
    consumer: #消费者序列化器
      group-id: ${spring.application.name}-test # 消费者组的唯一标识符。在消费者组中的所有消费者将共享消费者组的工作负载。
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消费者消息键的类。
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #用于反序列化消费者消息值的类。
    listener: #配置了监听器相关的设置。
      ack-mode: manual #开启手动确认 设置为手动,表示消费者将等待手动确认来确定是否已成功处理消息。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3.创建kafka配置类:

配置 Kafka 主题(topic)的创建

@Configuration
public class KafkaConfig {

    /**
     * @return org.apache.kafka.clients.admin.NewTopic
     * @date 2024/5/31 14:42
     * @Description: TODO @Bean 注解
     * 作用:将方法返回的对象注册为 Spring 容器中的一个 Bean。
     * 返回值类型:NewTopic,表示 Kafka 主题的配置信息。
     */
    @Bean
    public NewTopic viewUserTopic(){
        /*第一个参数:主题名称,这里是 "viewUserTopic"。
           第二个参数:分区数量,这里设置为 1。
           第三个参数:副本数量,这里设置为 1。*/
        return new NewTopic("viewUserTopic",1,(short) 1);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
4.注入kafka模板类
@Autowired
private KafkaTemplate kafkaTemplate;
  • 1
  • 2
5.发送消息
//定义消息的唯一ID 防止消息重复消费
String msgId = "msg-" + UUID.randomUUID().toString();
//定义消息内容
String msgBody = JSON.toJSONString(tbUser);

//将消息唯一表示存入redis缓存  防止消息重复消费
stringRedisTemplate.opsForValue().set(msgId, msgBody);

/*组装消息体 发送消息队列*/
MessageVO messageVO = new MessageVO();
messageVO.setMsgID(msgId);
messageVO.setMsgBody(msgBody);

//向名为 "viewUserTopic" 的 Kafka 主题发送消息。
 //参数一: 表示目标 Kafka 主题的名称。   参数二:消息内容
 kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo))
                //通过该方法设置回调函数,用于处理消息发送的成功和失败情况。
                .addCallback(
                        //成功回调函数,处理消息发送成功的情况。
                new SuccessCallback() {
                    @Override
                    public void onSuccess(Object o) {
                        // 消息发送成功
                        System.out.println("kafka 消息发送成功了~~~~~~~~~~~~");
                    }
                },
                        //失败回调函数,处理消息发送失败的情况。
                new FailureCallback() {
                    @Override
                    public void onFailure(Throwable throwable) {
                        // 消息发送失败了,再次发送
                        System.out.println("kafka 消息发送失败了,再次发送");
                        kafkaTemplate.send("viewUserTopic", JSON.toJSONString(messageVo));
                    }
                }
        );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
6.消息的接收(监听)
/**
     * @param message 表示接收到的消息内容,这里是 JSON 格式的字符串。
     * @param acknowledgment 用于手动提交消费者偏移量的对象。
     * @date 2024/5/31 15:13
     * @Description: TODO
     * @KafkaListener 通过该注解指定了监听的 Kafka 主题为 "viewUserTopic"。
     */
    @KafkaListener(topics = "viewUserTopic")
    public void recvViewUserMessage(String message, Acknowledgment acknowledgment) {

        //--1 接收消息
        MessageVO messageVo = JSON.parseObject(message, MessageVO.class);

        //--2 根据消息的唯一ID,判断消息是否重复
        String msgId = messageVo.getMsgID();
        if (!stringRedisTemplate.hasKey(msgId)) {
            // 消息重复了
            System.out.println("kafka 消息重复了");
            // 使用 acknowledgment.acknowledge() 方法手动确认消费完成,通知 Kafka 服务器该消息已经被处理。
            acknowledgment.acknowledge();

            return;
        }
        //--3 消费消息(处理消息)
        String msgBody = messageVo.getMsgBody();
        TbLog tbLog = JSON.parseObject(msgBody, TbLog.class);
        tbLog.setCreateTime(new Date());
        tbLogMapper.insert(tbLog);

        //--4 手动确认消息
        //手动确认消费完成,通知 Kafka 服务器该消息已经被处理。
        acknowledgment.acknowledge();

        //--5 删除消息的唯一ID,防止消息重复消费
        stringRedisTemplate.delete(msgId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号