当前位置:   article > 正文

springboot集成kafka_springboot kafka consumer

springboot kafka consumer


Kafka介绍

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

Kafka的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

在这里插入图片描述

生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

在这里插入图片描述

偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka的消息队列

Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式

Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式

在这里插入图片描述

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列

在这里插入图片描述

Kafka的系统架构

在这里插入图片描述

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Kafka的核心API

Kafka 有四个核心API,它们分别是:

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

在这里插入图片描述

本地kafka和zk环境

我们需要在本地启动一个单机版的kafka和zookeeper环境。kafka的安装包自带zookeeper,直接启动即可。

我的本地环境配置如下:

  • win10系统
  • kafka_2.12-2.5.0
  • zookeeper-3.4.12
  • spring boot 2.2.10.RELEASE

zookeeper的安装不做说明了,直接启动zk,端口是2181

在这里插入图片描述

启动成功后的窗口:

在这里插入图片描述

启动kafka,首先修改配置文件config文件下的server.properties文件

listeners=PLAINTEXT://127.0.0.1:9092

log.dirs=/kafka-logs 日志存储的文件夹

到kafka根目录下,输入命令 .\bin\windows\kafka-server-start.bat .\config\server.properties ,回车;启动成功的窗口。

在这里插入图片描述

springboot集成kafka

  • 依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 配置文件
server.port=7002
#Kafka集群
spring.kafka.bootstrap-servers=127.0.0.1:9092
#初始化生产者配置
#重试次数
spring.kafka.producer.retries=0
#应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
spring.kafka.producer.acks=1
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
#批量大小
spring.kafka.producer.batch-size=16384
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#初始化消费者配置
#默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
#是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

先来解释下这几个配置,

  • bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
  • batch-size:当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
  • retries:若设置大于0的值,客户端会将发送失败的记录重新发送
  • buffer-memory:Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
  • key-serializer:关键字的序列化类
  • value-serializer:值的序列化类

代码

  • 生产者代码
package com.cn.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 消费者代码
package com.cn.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    // 消费监听
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 只需要在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。

  • kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。

测试

启动springboot项目,postman调接口触发生产者发送消息

在这里插入图片描述

  • 可以看到监听器消费成功

在这里插入图片描述

  • 以上就是一个简单的springboot集成kafka的demo,因为最近要用到关于kafka的知识,所以就学习一下,把自己学习到的新的知识用博客记录下来,印象会更深哦~
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/906492
推荐阅读
相关标签
  

闽ICP备14008679号