当前位置:   article > 正文

kafka基础知识_kafka用什么语言开发的

kafka用什么语言开发的

kafka基础知识

1. 简介

kafka 是最初由 linkedin 公司开发的,使用 scala 语言编写,kafka 是一个分布式,分区的,多副本的,

多订阅者的日志系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等。

2. 支持的语言

kafka 目前支持多种客户端的语言:java、python、c++、php 等

3. apache kafka 是一个分布式发布**-**订阅消息系统

apache kafka 是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将

消息从一个端点传递到另一个端点,kafka 适合离线和在线消息消费。kafka 消息保留在磁盘上,并在

集群内复制以防止数据丢失。kafka 构建在 zookeeper 同步服务之上。它与 apache 和 spark 非常好的

集成,应用于实时流式数据分析。

4. 其他的消息队列

RabbitMQ

Redis

ZeroMQ

ActiveMQ

5. kafka的好处

可靠性:分布式的,分区,复制和容错的。

可扩展性:kafka 消息传递系统轻松缩放,无需停机。

耐用性:kafka 使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是

持久的。

性能:kafka 对于发布和定于消息都具有高吞吐量。即使存储了许多 TB 的消息,他也爆出稳

定的性能。

kafka 非常快:保证零停机和零数据丢失。

6. kafka应用场景

6.1、指标分析

kafka 通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息, 以产生操作的数据

集中反馈

6.2、日志聚合解决方法

kafka 可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。

6.3、流式处理

流式处理框架(spark,storm,flflink)重主题中读取数据,对齐进行处理,并将处理后的数据写入新

的主题,供 用户和应用程序使用,kafka 的强耐久性在流处理的上下文中也非常的有用。

7. kafka 架构

7.1 kafka 四大核心

  1. 生产者API

    允许应用程序发布记录流至一个或者多个 kafka 的主题(topics)。

  2. 消费者API

    允许应用程序订阅一个或者多个主题,并处理这些主题接收到的记录流。

  3. StreamAPI

    允许应用程序充当流处理器(stream processor),从一个或者多个主题获取输入流,并生产一个输出

    流到一个或 者多个主题,能够有效的变化输入流为输出流。

  4. ConnectorAPI

允许构建和运行可重用的生产者或者消费者,能够把 kafka 主题连接到现有的应用程序或数据系统。

例如:一个连 接到关系数据库的连接器可能会获取每个表的变化。

7.2 架构关系图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mMeWVYHN-1656401089030)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220623165612253.png)]

说明:kafka 支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系有客户端负责维护,消

息消费完 后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

7.3 kafka整体架构图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJbeuDdy-1656401089031)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220623165825047.png)]

7.4 kafka架构说明

一个典型的 kafka 集群中包含若干个 Producer,若干个 Broker,若干个 Consumer,以及一个

zookeeper 集群; kafka 通过 zookeeper 管理集群配置,选举 leader,以及在 Consumer Group

发生变化时进行 Rebalance(负载均 衡);Producer 使用 push 模式将消息发布到 Broker;

Consumer 使用 pull 模式从 Broker 中订阅并消费消息。

8. kafka术语

8.1 术语介绍

Broker:kafka 集群中包含一个或者多个服务实例,这种服务实例被称为 Broker

Topic:每条发布到 kafka 集群的消息都有一个类别,这个类别就叫做 Topic

Partition:Partition 是一个物理上的概念,每个 Topic 包含一个或者多个 Partition

Producer:负责发布消息到 kafka 的 Broker 中。

Consumer:消息消费者,向 kafka 的 broker 中读取消息的客户端

Consumer Group:每一个 Consumer 属于一个特定的 Consumer Group(可以为每个 Consumer

指定 groupName)

8.2 kafka topic 说明

kafka 将消息以 topic 为单位进行归类

topic 特指 kafka 处理的消息源(feeds of messages)的不同分类。

topic 是一种分类或者发布的一些列记录的名义上的名字。kafka 主题始终是支持多用户订阅的;

也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。

在 kafka 集群中,可以有无数的主题。

生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。

8.3 kafka 中分区数(Partitions)

Partitions**:分区数**

Partitions:分区数:控制 topic 将分片成多少个 log,可以显示指定,如果不指定则会使用 
  • 1

一个 broker 服务下,是否可以创建多个分区?

可以的,broker 数与分区数没有关系; 在

kafka 中,每一个分区会有一个编号:编号从 0 开始

某一个分区的数据是有序的

说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时

候也是什么样的顺序)

topic 的 Partition 数量在创建 topic 时配置。

Partition 数量决定了每个 Consumer group 中并发消费者的最大数量。

Consumer group A 有两个消费者来读取 4 个 partition 中数据;Consumer group B 有四个

消费者来读取 4 个 partition 中的数据

8.4 kafka **中副本数(**Partition Replication)

kafka 分区副本数(kafka Partition Replicas)

副本数(replication-factor)

副本数(replication-factor):控制消息保存在几个 broker(服务器)上,一般情况下等于broker 的个数
  • 1

一个 broker 服务下,是否可以创建多个副本因子?

不可以;创建主题时,副本因子应该小于等于可用的 broker 数。

副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;主副本叫做 leader,

从副本叫做 follower(在有多个副本的情况下,kafka 会为同一个分区下的分区,设定角色

关系:一个 leader 和 N 个 follower),处于同步状态的副本叫做 in-sync

replicas(ISR);follower 通过拉的方式从 leader 同步数据。消费 者和生产者都是从 leader 读写

数据,不与 follower 交互。

副本因子的作用:让 kafka 读取数据和写入数据时的可靠性。

副本因子是包含本身|同一个副本因子不能放在同一个 Broker 中。

如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个钟,选择一个

leader,但不会在其 他的 broker 中,另启动一个副本(因为在另一台启动的话,存在数据传

递,只要在机器之间有数据传递,就 会长时间占用网络 IO,kafka 是一个高吞吐量的消息系

统,这个情况不允许发生)所以不会在零个 broker 中启 动。

如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。

lsr 表示:当前可用的副本

8.5 kafka Partition offffset

任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为

offffset(偏移量),

offffset 是一个 long 类型数字,它唯一标识了一条消息,消费者通过(offffset,partition,topic)跟

踪记录

8.6 kafka 分区和消费组之间的关系

消费组: 由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次

某一个主题下的分区数,对于消费组来说,应该小于等于该主题下的分区数。如下所示:

如:某一个主题有 4 个分区,那么消费组中的消费者应该小于 4,而且最好与分区数成整数倍

1 2 4

同一个分区下的数据,在同一时刻,不能同一个消费组的不同消费者消费

总结:分区数越多,同一时间可以有越多的消费者来进行消费,消费数据的速度就会越快,提高消费的性能

9. kafka 集群的搭建

9.1 准备三台机器

192.168.100.100 node01 
192.168.100.110 node02 
192.168.100.120 node03 
  • 1
  • 2
  • 3

9.2 初始化环境

安装jdk、安装zookeeper

9.3 kafka集群安装

9.3.1 下载地址

http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

9.3.2 上传到linux服务器并解压

将下载好的安装包上传到 node01 服务器的/export/softwares 路径下,然后进行解压

node01 执行以下命令进行解压安装包

cd /export/softwares 
tar -zxvf kafka_2.11-0.10.0.0.tgz -C /export/servers/ 
  • 1
  • 2
9.3.3 修改配置文件

node01 执行以下命令进入到 kafka 的配置文件目录,修改配置文件

cd /export/servers/kafka_2.11-0.10.0.0/config
vim server.properties
  • 1
  • 2
broker.id=0 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
log.flush.interval.messages=10000 
log.flush.interval.ms=1000 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=node01:2181,node02:2181,node03:2181 
zookeeper.connection.timeout.ms=6000 
group.initial.rebalance.delay.ms=0 
delete.topic.enable=true 
host.name=node01 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

node01 执行以下命令创建数据文件存放目录

mkdir -p /export/servers/kafka_2.11-0.10.0.0/logs 
  • 1
9.3.4 分发安装包

node01 执行以下命令,将 node01 服务器的 kafka 安装包发送到 node02 和 node03 服务器上面去

cd /export/servers/ 
scp -r kafka_2.11-0.10.0.0/ node02:$PWD 
scp -r kafka_2.11-0.10.0.0/ node03:$PWD 
  • 1
  • 2
  • 3
9.3.5 node02 node03 服务器修改配置文件

node02 与 node03 服务器修改 kafka 配置文件

node02 使用以下命令修改 kafka 配置文件

cd /export/servers/kafka_2.11-0.10.0.0/con·fig 
vim server.properties 
  • 1
  • 2
broker.id=1 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 

log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
log.flush.interval.messages=10000 
log.flush.interval.ms=1000 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=node01:2181,node02:2181,node03:2181 
zookeeper.connection.timeout.ms=6000 
group.initial.rebalance.delay.ms=0 
delete.topic.enable=true 
host.name=node02 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

node03 使用以下命令修改 kafka 配置文件

cd /export/servers/kafka_2.11-0.10.0.0/config 

vim server.properties 
  • 1
  • 2
  • 3
broker.id=2 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs 
num.partitions=2 
num.recovery.threads.per.data.dir=1 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
log.flush.interval.messages=10000 
log.flush.interval.ms=1000 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=node01:2181,node02:2181,node03:2181 
zookeeper.connection.timeout.ms=6000 
group.initial.rebalance.delay.ms=0 
delete.topic.enable=true 
host.name=node03 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

9.4 启动集群

注意事项: 在kafka启动前,一定要让zookeeper启动起来。

9.4.1 启动命令-前台启动

node01 服务器执行以下命令来启动 kafka 集群

cd /export/servers/kafka_2.11-0.10.0.0 
bin/kafka-server-start.sh config/server.properties 
  • 1
  • 2
9.4.2 启动命令-后台启动

node01 执行以下命令将 kafka 进程启动在后台

cd /export/servers/kafka_2.11-0.10.0.0 
nohup bin/kafka-server-start.sh config/server.properties 2>&1 & 
  • 1
  • 2
9.4.3 停止命令

node01 执行以下命令便可以停止 kakfa 进程

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh 
  • 1
  • 2

10 kafka集群操作

10.1 控制台操作

10.1.1 创建topic

创建了一个名字为 test 的主题, 有三个分区,有两个副本

node01 执行以下命令来创建 topic

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --parrtitions 3 --topic test
  • 1
  • 2
10.1.2 查看主题命令

查看 kafka 当中存在的主题

node01 使用以下命令来查看 kafka 当中存在的 topic 主题

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181 --topic test
  • 1
  • 2
10.1.3 生产者生产数据

模拟生产者来生产数据

node01 服务器执行以下命令来模拟生产者进行生产数据

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092
  • 1
  • 2
10.1.4 消费者消费数据

node2服务器执行一下命令来模拟消费者进行消费数据

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
  • 1
  • 2
10.1.5 查看topics的相关信息

node01 执行以下命令运行 describe 查看 topic 的相关信息

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
  • 1
  • 2
10.1.6 修改topic属性
  1. 增加topic分区数

​ 任意 kafka 服务器执行以下命令可以增加 topic 分区数

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
  • 1
  • 2
  1. 增加配置

    动态修改kafka的配置

    任意kafka服务器执行以下命令可以增加topic分区数

    cd /export/servers/kafka_2.11-0.10.0.0 
    bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
    
    • 1
    • 2
  2. 删除配置

    动态删除kafka集群配置

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
  • 1
  • 2
  1. 删除topic

    目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要在

    server.properties 中配置:

delete.topic.enable=true
  • 1

​ 然后执行以下命令进行删除 topic

kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
  • 1

11. kafka原理部分

11.1 生产-消费流程

数据从生产-消费-提交 offffset 过程,分以下几个阶段来进行

11.2 生产者

生产者是一个向kafka Cluster发布记录的客户端;生产者是线程安全的,跨线程共享单个生产者实例通常比具有多个实例更快。

11.2.2 必要条件

生产者要进行生产数据到kafka Cluster中,必要条件有以下三个:

# 1. 地址
bootstrap.servers=node01:9092
# 2. 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 1
  • 2
  • 3
  • 4
  • 5
11.2.3 生产者(Producer)写数据

流程描述:

​ 1、总体流程:

​ Producer 连接任意活着的 Broker,请求指定 Topic,Partion 的 Leader 元数据信息,然后直接与对应的 Broker 直接连接,发布数据

​ 2、开放分区接口(生产者数据分发策略):

​ 2.1、用户可以指定分区函数,使得消息可以根据 key,发送到指定的 Partition 中。

​ 2.2、kafka 在数据生产的时候,有一个数据分发策略。默认的情况使用 DefaultPartitioner.class 类。

​ 这个类中就定义数据分发的策略。

​ 2.3、如果是用户制定了 partition,生产就不会调用 DefaultPartitioner.partition()方法

​ 2.4、当用户指定 key,使用 hash 算法。如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定 值,这种 hash 取模就没有意义

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
  • 1

​ 2.5 、 当用既没有指定 partition 也没有 key。

​ 2.6、数据分发策略的时候,可以指定数据发往哪个 partition。当 ProducerRecord 的构造参数中有 partition 的时 候,就可以发送到对应 partition 上。

11.2.4 生产者数据分发策略

​ 生产者数据分发策略有如下四种:(总的来说就是调用了一个方法,参数不同而已)

// 可根据主题和内容发送
public ProduceRecord(String topic,V value)
// 可根据主题,key、内容发送
public ProducerRecord(String topic, K key V value)
// 根据主题、分区、key、内容发送
public ProducerRecord(String topic,Integer partition , K key , V value)
// 根据主题、分区、时间戳、key、内容发送
public ProducerRecord(String topic,Integer partition,Locng timestamp,K key , V value)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

a、可根据主题和内容发送

Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题和内容发送
producer.send(new ProducerRecord<String, String>("my-topic","具体的数据"));
  • 1
  • 2
  • 3

b、根据主题,key、内容发送

Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic","key","具体的数据"));
  • 1
  • 2
  • 3

c、根据主题、分区、key、内容发送

Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、分区、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic",1,"key","具体的数据"));
  • 1
  • 2
  • 3

d、根据主题、分区、时间戳、key,内容发送

Producer<String, String> producer = new KafkaProducer<String, String>(props);
//可根据主题、分区、时间戳、key、内容发送
producer.send(new ProducerRecord<String, String>("my-topic",1,12L,"key","具体的数据"));
  • 1
  • 2
  • 3

11.3 消费者

消费者是一个从 kafka Cluster 中消费数据的一个客户端;该客户端可以处理 kafka brokers 中的故障问题,并且可以适应在集群内的迁移的 topic 分区;该客户端还允许消费者组使用消费者组来进行负载均衡。

消费者维持一个 TCP 的长连接来获取数据,使用后未能正常关闭这些消费者问题会出现,因此消费者不是线程安全的

11.3.1 必要条件

消费者要从 kafka Cluster 进行消费数据,必要条件有以下四个

# 1. 地址
bootstrap.servers=node01:9092
# 2. 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# 3. 主题topic,需要制定具体的某个topic(order)即可.
# 4. 消费组 group.id=test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
11.3.2 消费者代码 - 自动提交offset值
/**
* 消费订单数据--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\连接集群
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-01:9092"); 
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交 offset 值 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String> (props);
// 2、发送数据 发送数据需要,订阅下要消费的 topic。 order 
kafkaConsumer.subscribe(Arrays.asList("order"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
// jdk queue offer插入、poll 获取元素。 blockingqueue put 插入原生, take 获取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value());
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
11.3.3 消费者代码-手动提交offset

如果 Consumer 在获取数据后,需要加入处理,数据完毕后才确认 offffset,需要程序来

控制 offffset 的确认? 关闭自动提交确认选项

props.put("enable.auto.commit", "false");
  • 1

手动提交 offffset 值

kafkaConsumer.commitSync();
  • 1

完整代码如下所示:

props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test");
//关闭自动提交确认选项
props.put("enable.auto.commit", "false"); 
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); 
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
		}
if (buffer.size() >= minBatchSize) { insertIntoDb(buffer);
// 手动提交 offset 值
consumer.commitSync(); 
buffer.clear();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
11.3.4 消费者代码完成处理每个分区中的记录后提交偏移量

上面的示例使用 commitSync 将所有已接收的记录标记为已提交。 在某些情况下,您

可能希望通过明确指定偏移量 来更好地控制已提交的记录。 在下面的示例中,我们在完成处理每个分区中的记录后提交偏移量。

try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
for (TopicPartition partition : records.partitions()) {
	List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
	System.out.println(record.offset() + ": " + record.value());
			}
long lastOffset = partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
		}
	}
} finally { 
consumer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

注意事项:

​ 提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个.

11.3.5 消费者代码使用消费者消费指定分区的数据

1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。

2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用 YARN,Mesos 或 AWS 工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka 不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList("foo", "bar"));
//手动指定消费指定分区的数据---start
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0); 
TopicPartition partition1 = new TopicPartition(topic, 1); 
consumer.assign(Arrays.asList(partition0, partition1));
//手动指定消费指定分区的数据---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); 
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), 
record.value());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

注意事项:

1、要使用此模式,您只需使用要使用的分区的完整列表调用 assign(Collection),而不

是使用 subscribe 订阅 主题。

2、主题与分区订阅只能二选一。

11.3.6 消费者数据丢失-数据重复

说明:

  1. 已经消费的数据对于 kafka 来说,会将消费组里面的 offset 值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;

  2. 提交过程:是通过 kafka 将 offset 进行移动到下个 message 所处的 offset 的位置。

  3. 拿到数据后,存储到 hbase 中或者 mysql 中,如果 hbase 或者 mysql 在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么 kafka 伤的offffset 值已经进行了修改了,但是 hbase 或者 mysql 中没有数据,这个时候就会出现数据丢失

  4. 什么时候提交 offset 值?在 Consumer 将数据处理完成之后,再来进行 offset 的修改提交。默认情况下 offset 是 自动提交,需要修改为手动提交 offset 值。

  5. 如果在处理代码中正常处理了,但是在提交 offset 请求的时候,没有连接到 kafka 或

    者出现了故障,那么该次修 改 offset 的请求是失败的,那么下次在进行读取同一个分区

    中的数据时,会从已经处理掉的 offset 值再进行处理一 次,那么在 hbase 中或者 mysql

    中就会产生两条一样的数据,也就是数据重复

11.4 kafka的log-存储机制

12. kafka消息不丢失制

12.1 生产者生产数据不丢失

12.1.1生产者数据不丢失过程图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DgLAbgTQ-1656401089031)(…/…/…/AppData/Roaming/Typora/typora-user-images/image-20220628151941411.png)]

说明:有多少个分区,就启动多少个线程来进行同步数据

12.1.2 发送数据方式

可以采用同步或者异步的方式-过程图

可以采用同步或者异步的方式

同步:发送一批数据给 kafka 后,等待 kafka 返回结果

1、生产者等待 10s,如果 broker 没有给出 ack 相应,就认为失败。
2、生产者重试 3 次,如果还没有相应,就报错
  • 1
  • 2

异步:发送一批数据给 kafka,只是提供一个回调函数。

1、先将数据保存在生产者端的 buffer 中。buffer 大小是 2 万条 
2、满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
3、发送一批数据的大小是 500 条
  • 1
  • 2
  • 3

说明:如果 broker 迟迟不给 ack,而 buffer 又满了,开发者可以设置是否直接清空buffer 中的数据。

12.1.3 ack 机制(确认机制)

生产者数据不抵事,需要服务端返回一个确认码,即 ack 响应码;ack 的响应有三个状态值

0:生产者只负责发送数据,不关心数据是否丢失,响应的状态码为 0(丢失的数据,需要再次发送 ) 
1:partition 的 leader 收到数据,响应的状态码为 1
-1:所有的从节点都收到数据,响应的状态码为-1
  • 1
  • 2
  • 3

说明:如果 broker 端一直不给 ack 状态,producer 永远不知道是否成功;producer可以设置一个超时时间 10s,超 过时间认为失败。

12.2 kafka 的 broker 中数据不丢失

在 broker 中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失

12.3 消费者消费数据不丢失

在消费者消费数据的时候,只要每个消费者记录好 offffset 值即可,就能保证数据不丢失。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/784964
推荐阅读
相关标签
  

闽ICP备14008679号