当前位置:   article > 正文

Kafka 安装、使用_kafka客户端

kafka客户端

目录

Kafka安装

概念

简单使用

创建topic

发送消息

消费消息

消息的细节

单播消息

多播消息

 查看消费组的详细信息

主题和分区的概念

1.主题topic

2.分区partition 

创建多分区的主题

查看主题的分区消息

消息日志文件

kafka集群及副本的概念

kafka集群搭建

副本

集群消费

1.向集群发送消息:

2.从集群中消费消息(不带消费组)

3.指定消费组消费消息

4.多分区,多消费组消费消息

Kafka安装

kafka 安装_naki_bb的博客-CSDN博客

概念

名称
解释
Broker
消息中间件处理节点,一个 Kafka 节点就是一个 broker ,一个或者多个Broker可以组成一个 Kafka 集群
Topic
Kafka 根据 topic 对消息进⾏归类,发布到 Kafka 集群的每条消息都需要指定 一个topic
Producer
消息生产者,向 Broker 发送消息的客户端
Consumer
消息消费者,从 Broker 读取消息的客户端

简单使用

创建topic

  • 通过kafka命令向zk中创建一个名为test的主题
  1. cd /opt/kafka_2.13-2.8.1/bin
  2. #创建test主题
  3. ./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 1 --partitions 1 --topic test

  • 查看当前zk中所有的主题
./kafka-topics.sh --list --zookeeper 192.168.99.100:2181

 注意指令中的ip和端口都是zookeeper的

发送消息

kafka自 带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令
行中直接输⼊内容,并将这些内容以消息的形式发送到 kafka 集群中。在默认情况下,每一行会被当做成一个独立的消息。使用kafka 的发送消息的客户端,指定发送到的 kafka 服务器地址和topic
./kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic test

消费消息

对于 consumer kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输
出, 默认是消费最新的消息 。使用 kafka 的消费者消息的客户端,从指定 kafka 服务器的指定
topic 中消费消息
  • 方式一:从最后一条消息的偏移量+1开始消费

就是只能接受到连接之后,发送方发送的消息,连接之前的消息,接受不到 

./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --topic test

   可以发现连接之前的消息没有收到

  • 方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --from-beginning --topic test

经过以上两个方式的测试,可以发现:
  • 消息会被存储
  • 消息是顺序存储
  • 消息是有偏移量的
  • 消费时可以指明偏移量进行消费

消息的细节

  •  生产者将消息发送给broker,broker会将消息保存在本地的日志文件中

具体的日志目录在service.properies中配置

 在日志文件目录中可以查看到topic的信息,索引,时间等信息

  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

单播消息

在kafka的一个topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否
同时会被两个消费者消费?
  • 在没有消费组的情况下,消费者都可以收到消息,进行消费
  • 如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息,换言之,同一个消费组中只能有一个消费者收到topic中的消息
./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup --topic test

多播消息

不同的消费组订阅同一个topic,那么每个消费组中只有个一个消费者能收到消息。

  1. ./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup1 --topic test
  2. ./kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --consumer-property group.id=testGroup2 --topic test
下图就是描述多播和单播消息的区别

 查看消费组的详细信息

  1. # 查看当前主题下有哪些消费组
  2. ./kafka-consumer-groups.sh --bootstrap-server 192.168.99.100:9092 --list
  3. # 查看消费组中的具体信息:比如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
  4. ./kafka-consumer-groups.sh --bootstrap-server 192.168.99.100:9092 --describe --group testGroup

  • Currennt-offset: 当前消费组的已消费偏移量
  • Log-end-offset: 消息总量(最后一条消息的偏移量)
  • Lag: 当前消费组未消费的消息数

主题和分区的概念

1.主题topic

主题 -topic kafka 中是一个逻辑的概念, kafka 通过 topic 将消息进行分类。不同的 topic 会被
订阅该 topic 的消费者消费。
但是有一个问题,如果说这个 topic 中的消息非常非常多,多到需要几 T 来存,因为消息是会被
保存到 log日志文件 中的。为了解决这个文件过大的问题, kafka 提出了 Partition 分区的概念

2.分区partition 

分区的作用:
  • 可以分布式存储
  • 可以并行写
通过 partition 将一个 topic 中的消息在不同的分区来存储。这样的好处有多个:
  • 分区存储,可以解决统一存储文件过大的问题
  • 提供了读写的吞吐量:读和写可以同时在多个分区中进行

上图中一个topic 创建了 3 个分区。那么 topic 中的消息就会分别存放在这三个分区中,所有分区的消息总和才是这个topic的全部消息。

创建多分区的主题

./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 1 --partitions 2 --topic test1

通过--partitions 参数设置分区个数,来表示用多少个分区存储这个topic的消息

查看主题的分区消息

./kafka-topics.sh --describe --zookeeper 192.168.99.100:2181 --topic test1

消息日志文件

  • 00000.log: 这个文件中保存的就是消息
  • __consumer_offsets-49:
    kafka 内部自己创建了 __consumer_offsets 主题包含了 50 个分区。这个主题用来存放所有消费
    者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给 kafka中的默认主题: consumer_offsets。因此 kafka 为了提升这个主题的并发性,默认设置了 50 个分区
  1. 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
  2. 提交到该主题中的内容是:keyconsumerGroupId+topic+分区号,value就是当前 offset的值
  • 文件中保存的消息,默认保存 7 天。七天到后消息会被删除。

kafka集群及副本的概念

kafka集群搭建

kafka 集群_naki_bb的博客-CSDN博客

副本

副本是对分区的备份。在集群中,不同的副本会被部署在不同的 broker 上。
下面的例子:创建 1个主题,2 个分区、 3 个副本。
./kafka-topics.sh --create --zookeeper 192.168.99.100:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

--replication-factor表示副本数,只有broker的数量>副本数,才有意义

副本是为了为主题中的分区创建多个备份,多个副本在 kafka 集群的多个 broker 中,会有一个
副本作为 leader ,其他是 follower
查看 topic 情况:
  1. # 查看topic情况
  2. ./kafka-topics.sh --describe --zookeeper 192.168.99.100:2181 --topic myreplicated-topic

通过查看主题信息,其中的关键数据:
  • replicas: 当前副本存在的broker节点
  • leader:副本里的概念 ,每个partition都有1个broker作为leader。kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader了,经过主从选举,从多个follower中选举产生一个新的leader
  • follwer : 接受leader的同步的数据
  • isr : 可以同步和已同步的节点会存入到isr集合中,如果isr中的节点性能较差,会被提出isr集合。
集群中有多个 broker ,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里

集群消费

1.向集群发送消息:

./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic

2.从集群中消费消息(不带消费组)

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic

3.指定消费组消费消息

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

4.多分区,多消费组消费消息

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的
  • partition 的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition 的数量,否则多的消费者消费不到消息
  • 如果消费者挂了,那么会触发 rebalance 机制,会让同一个group的其他消费者来消费该分区需要消费的消息​​​​​​​

集群Controller、Rebalance、和HW

Controller

Kafka集群中broker在zookeeper中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群中的controller,负责管理整个集群中的所有分区和副本的状态:

  • 当某个分区的leader副本出现故障时,由controller控制器负责为该分区选取新的leader副本,选举的规则是从isr集合中最左边获得。
  • 当集群中有broker新增或者减少,controller会同步信息给其他broker
  • 当集群中有分区的新增和减少,controller会同步信息给其他broker

Rebalance机制

前提:消费组的消费者没有指定分区来消费

触发条件:当消费组中的消费者和分区发生变化的时候

分区分配的策略:在rebalance之前,分区怎么分配会有以下3中策略

range:根据公示计算得到每个消费者消费哪几个分区:剩余分区总数 / 消费者数量 +1

轮询 :每个消费者轮询所有分区

sticky:粘合策略,如果需要rebalance,会在之前的已分配的基础上调整,不会改变之前的分配状况。如果这个策略没有打开,那么就要进行全部的重新分配,建议开启。

HW 和 LEO

LEO是某个副本最后消息的位置,每一个部分都有自己的LEO

HW是已完成同步的位置,消息在写入broker时,且每个broker完成这条消息的同步后(更新了最新消息的LEO),HW才会变化。在这之前消费者是消费不到这条消息的,在同步完成以后,HW在更新,更新完成消费者才能消费到这条消息,这样的目的是防止消息的丢失(当leader所在的broker失效后,该消息仍然可以从新的选举的leader中获取到)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/758329
推荐阅读
相关标签
  

闽ICP备14008679号