赞
踩
1)kafka定义
Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。
2)传统消息队列应用场景
诸如电商业务中的秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。
交易系统作为淘宝等电商的最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列可实现异步通信和应用解耦,确保主站业务的连续性。
细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列提供的顺序消息即保证消息FIFO。
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
电商的大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列构建分布式缓存,实时通知商品数据的变化。
3)kafka特点和优势
特点:
优势:
4)kafka角色介绍
(1)Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker
(2)Consumer:消费者,用于消费消息,即处理消息
Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
(3)Topic :消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表,一条消息相当于关系数据库的一条记录,一个Topic或者相当于Redis中列表类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息
(4)Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,类似于一对一的单播机制,但多个consumer group 可同时消费这一消息,类似于一对多的多播机制
(5)Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建 topic时可指定 partition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的
(6)Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES的副本有所不同,Kafka中的副本数包括主分片数,而ES中的副本数不包括主分片数
为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片。
假设分区为 3, 即分三个分区0-2,副本为3,即每个分区都有一个 leader,再加两个follower,分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。
AR: Assigned Replicas,分区中的所有副本的统称,包括leader和 follower,AR= lSR+ OSR
lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,包括leader和 follower,是AR的子集
OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集
5)分区和副本的优势
实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
提升性能,多服务器并行读写
实现高可用,每个分区都有一个主分区即 leader 分布在不同的kafka 服务器,并且有对应follower 分布在和leader不同的服务器上
6)kafka 写入消息的流程
我这里直接采用集群方式部署,如果需要单机部署,可以参考官网文档部署。
https://kafka.apache.org/quickstart
1)集群规划
主机名 | IP | 应用 | OS |
---|---|---|---|
node01 | 11.0.1.131 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
node02 | 11.0.1.136 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
node03 | 11.0.1.137 | zookeeper、kafka v_3.6.1 | Euler 21.10 LTS |
2)环境准备 ZooKeeper
zookeeper集群我这里已经提前准备好了,部署过程就省略了,有需要可以去参考 zookeeper 那篇文章。
[root@node01 ~]# /usr/local/zookeeper/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower [root@zk-02 ~]# /usr/local/zookeeper/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader [root@zk-03 ~]# /usr/local/zookeeper/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower
3)kafka 集群部署
我这里kafka集群和zookeeper集群复用三台服务器,如有其他需求,可以分开部署,注意都是需要安装java环境的。
【1】下载安装包,是编译好了,解开就能用
# 尽量下载scala版本为2.13的
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
# kafka版本格式
kafka_<scala 版本>_<kafka 版本>
# 示例:kafka_2.13-2.7.0.tgz
【2】上传至各个节点
# 创建存放目录(个人习惯)
[root@node01 ~]# mkdir -p /home/weihu/src
[root@node02 ~]# mkdir -p /home/weihu/src
[root@node03 ~]# mkdir -p /home/weihu/src
[root@node01 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:51 kafka_2.13-3.6.1.tgz
[root@node02 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:51 kafka_2.13-3.6.1.tgz
[root@node03 ~]# ll /home/weihu/src/
总用量 109M
-rw-r--r-- 1 root root 109M 6月 16 14:45 kafka_2.13-3.6.1.tgz
【3】解压缩
[root@node01 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/ [root@node01 src]# ll /usr/local/ 总用量 48K drwxr-xr-x. 2 root root 4.0K 6月 24 2021 bin drwxr-xr-x. 2 root root 4.0K 6月 24 2021 etc drwxr-xr-x. 2 root root 4.0K 6月 24 2021 games drwxr-xr-x. 2 root root 4.0K 6月 24 2021 include drwxr-xr-x 7 root root 4.0K 11月 24 2023 kafka_2.13-3.6.1 drwxr-xr-x. 2 root root 4.0K 6月 24 2021 lib drwxr-xr-x. 3 root root 4.0K 1月 23 18:51 lib64 drwxr-xr-x. 2 root root 4.0K 6月 24 2021 libexec drwxr-xr-x. 2 root root 4.0K 6月 24 2021 sbin drwxr-xr-x. 5 root root 4.0K 1月 23 18:51 share drwxr-xr-x. 2 root root 4.0K 6月 24 2021 src drwxr-xr-x 8 root root 4.0K 6月 14 21:48 zookeeper [root@node02 ~]# cd /home/weihu/src/ [root@node02 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/ [root@node03 ~]# cd /home/weihu/src/ [root@node03 src]# tar -xf kafka_2.13-3.6.1.tgz -C /usr/local/
【4】改名或者创建软连接
[root@node01 src]# cd /usr/local/ [root@node01 local]# mv kafka_2.13-3.6.1 kafka [root@node01 local]# ll 总用量 48K drwxr-xr-x. 2 root root 4.0K 6月 24 2021 bin drwxr-xr-x. 2 root root 4.0K 6月 24 2021 etc drwxr-xr-x. 2 root root 4.0K 6月 24 2021 games drwxr-xr-x. 2 root root 4.0K 6月 24 2021 include drwxr-xr-x 7 root root 4.0K 11月 24 2023 kafka drwxr-xr-x. 2 root root 4.0K 6月 24 2021 lib drwxr-xr-x. 3 root root 4.0K 1月 23 18:51 lib64 drwxr-xr-x. 2 root root 4.0K 6月 24 2021 libexec drwxr-xr-x. 2 root root 4.0K 6月 24 2021 sbin drwxr-xr-x. 5 root root 4.0K 1月 23 18:51 share drwxr-xr-x. 2 root root 4.0K 6月 24 2021 src drwxr-xr-x 8 root root 4.0K 6月 14 21:48 zookeeper [root@node02 src]# cd /usr/local/ [root@node02 local]# mv kafka_2.13-3.6.1 kafka [root@node03 src]# cd /usr/local/ [root@node03 local]# mv kafka_2.13-3.6.1 kafka # 或者创建软连接 ln -s /usr/local/kafka_2.13-3.6.1 /usr/local/kafka
【5】配置环境变量(可选)
[root@node01 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node01 local]# . /etc/profile.d/kafka.sh
[root@node02 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node02 local]# . /etc/profile.d/kafka.sh
[root@node03 local]# echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node03 local]# . /etc/profile.d/kafka.sh
【6】kafka目录查看
[root@node01 local]# ll /usr/local/kafka/
总用量 72K
drwxr-xr-x 3 root root 4.0K 11月 24 2023 bin
drwxr-xr-x 3 root root 4.0K 11月 24 2023 config
drwxr-xr-x 2 root root 12K 6月 16 14:55 libs
-rw-r--r-- 1 root root 15K 11月 24 2023 LICENSE
drwxr-xr-x 2 root root 4.0K 11月 24 2023 licenses
-rw-r--r-- 1 root root 28K 11月 24 2023 NOTICE
drwxr-xr-x 2 root root 4.0K 11月 24 2023 site-docs
【7】配置文件详解
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required. # See kafka.server.KafkaConfig for additional details and defaults # ############################# Server Basics ############################# # broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 ############################# Socket Server Settings ############################# # 套接字服务器侦听的地址。如果未配置,主机名将等于的值 # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # 侦听器名称、主机名和代理将向客户端公布的端口。 # 如果未设置,则使用“listeners”的值。 #advertised.listeners=PLAINTEXT://your.host.name:9092 # 将侦听器名称映射到安全协议,默认情况下它们是相同的。有关更多详细信息,请参阅配置文档 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 处理网络请求的线程数量(服务器用于从网络接收请求并向网络发送响应的线程数) num.network.threads=3 # 用来处理磁盘 IO 的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区最大大小 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas # 每个topic在当前 broker上的默认分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。 num.partitions=1 # 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。(用来恢复和清理 data 下数据的线程数量)对于数据目录位于RAID阵列中的安装,建议增加此值。 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # 每个 topic 创建时的副本数,默认时 1 个副本,对于开发测试以外的环境,建议使用大于1的值以确保可用性,如3 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # 强制将数据刷新到磁盘之前要接受的消息数 #log.flush.interval.messages=10000 # 在强制刷新之前,消息可以在日志中停留的最长时间 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 # 基于大小的日志保留策略。除非剩余的 # segments下降到 log.retention.bytes 以下。独立于log.retention.hours的函数. #log.retention.bytes=1073741824 # 每个 segment 文件的最大大小,默认最大 1G ,当达到此大小时,将创建一个新的segment。 log.segment.bytes=1073741824 # 检查日志段以查看是否可以根据保留策略删除它们的间隔(检查过期数据的时间,默认 5 分钟检查一次是否数据过期) log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper集群连接字符串,一个以逗号分隔的'主机:端口'对,每个对对应一个zk服务器。可以在url中附加一个可选的chroot字符串,以指定所有kafka-znode的根目录。 zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181 # 连接到zookeeper 的超时时间(毫秒) zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# # 以下配置指定GroupCoordinator将延迟初始使用者重新平衡的时间(以毫秒为单位)。 # 随着新成员加入组,再平衡将进一步延迟group.initial.rebalance.delay.ms的值,最大值为max.poll.interval.ms。 # 默认值为3秒。 # 我们在这里将其覆盖为0,因为它为开发和测试提供了更好的开箱即用体验。 # 但是,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间进行不必要的、可能代价高昂的重新平衡。 group.initial.rebalance.delay.ms=3
【8】修改配置文件
重要!!!
broker.id 一定要修改,每个broker 在集群中的唯一标识,正整数。
listeners 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
# node01节点 [root@node01 config]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://11.0.1.131:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/data num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 # node02节点 [root@node02 local]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties broker.id=2 listeners=PLAINTEXT://11.0.1.136:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/data num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 # node03节点 [root@node03 local]# egrep -v "^#|^$" /usr/local/kafka/config/server.properties broker.id=3 listeners=PLAINTEXT://11.0.1.137:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/data num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=11.0.1.131:2181,11.0.1.136:2181,11.0.1.137:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
【9】可以调整内存,根据业务需求
vim /usr/local/kafka/bin/kafka-server-start.sh
....
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
....
【9】创建数据目录
mkdir /usr/local/kafka/data
【9】启动kafka集群
[root@node01 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node02 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node03 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
【10】查看服务状态
[root@node01 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
[root@node02 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
[root@node03 ~]# ss -ntl |grep 9092
LISTEN 0 50 [::ffff:11.0.1.131]:9092 *:*
1)客户端连接zookeeper集群,查看kafka的元数据是否已经写入
[root@node03 data]# /usr/local/zookeeper/bin/zkCli.sh -server 11.0.1.131:2181
....
[zk: 11.0.1.131:2181(CONNECTED) 12] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
2)kafka读写数据
kafka-topics.sh #消息的管理命令
kafka-console-producer.sh #生产者的模拟命令
kafka-console-consumer.sh #消费者的模拟命令
【1】获取所有Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
# --bootstrap-server 指向哪个kafka节点都可以
【2】创建Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --topic test01 --bootstrap-server 11.0.1.136:9092 --partitions 3 --replication-factor 2
Created topic test01.
# --topic 主题名称
# --partitions 分区数
# --replication-factor 副本数
# 查看
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092 test01
[root@node01 ~]#
【3】查看Topic详细
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 11.0.1.136:9092 --topic test01 Topic: test01 TopicId: dfdpGX8FQJuXVQcLclmzaA PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: test01 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test01 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test01 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 # 状态说明:test01 有三个分区分别为0、1、2,分区0的leader是3 (broker.id),分区 0 有2 个副本,并且状态都为 Isr(ln-sync,表示可以参加选举成为 leader)。 # Replicas代表第几个服务器,在第一台服务器中,有0分区的分片和1分区的分片 # 查看数据目录也可以看出来 # test01是topic的名称,0和1就是副本,只不过看不出来是leader还是followers [root@node01 ~]# ls /usr/local/kafka/data/ cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-1 log-start-offset-checkpoint recovery-point-offset-checkpoint test01-0 [root@node02 kafka]# ls /usr/local/kafka/data/ cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-2 log-start-offset-checkpoint recovery-point-offset-checkpoint test01-1 [root@node03 data]# ls /usr/local/kafka/data/ cleaner-offset-checkpoint meta.properties replication-offset-checkpoint test01-2 log-start-offset-checkpoint recovery-point-offset-checkpoint test01-0
【4】生产 Topic(生产数据)
# 发送消息命令格式:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称>
# 产生数据,交互式输入消息,按Ctrl+C退出
[root@node01 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 11.0.1.131:9092 --topic test01
>hi01
>hello02
>wher
>
【5】消费Topic(消费数据)
注意点!!!
消息者先生产消息,消费者后续才启动,并且也能收到之前生产的消息
同一个消息在同一个group内的消费者只有被一个消费者消费,例如:共500条消息,在一个group内有A1,A2两个消费者,其中A1消费200条,A2消费另外的300条消息。从而实现负载均衡,不同group内的消费者则可以同时消费同一个消息
–from-beginning 表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
# 接收消息命令格式: kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer-property group.id=<组名称> # 生产数据 [root@node01 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 11.0.1.131:9092 --topic test01 >hello >aaa >bbb > # 消费数据(只能收到消费发布后的新消息) [root@node02 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --topic test01 --bootstrap-server 11.0.1.136:9092 hello aaa bbb # 消费数据(费前发布的消息也能收到) [root@node02 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --topic test01 --bootstrap-server 11.0.1.136:9092 --from-beginning hi01 hello02 wher hello aaa bbb
扩展:生产数据和消费数据连接Kafka集群的任意节点都可以
【6】删除Topic
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --topic test02 --bootstrap-server 11.0.1.136:9092 --partitions 3 --replication-factor 2
Created topic test02.
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
__consumer_offsets
test01
test02
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 11.0.1.136:9092 --topic test02
[root@node01 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 11.0.1.136:9092
__consumer_offsets
test01
[root@node01 ~]#
# topic 结构
/brokers/topics/[topic]
[zk: 11.0.1.131:2181(CONNECTED) 0] ls /brokers/topics
[__consumer_offsets, test01]
[zk: 11.0.1.131:2181(CONNECTED) 1] ls /brokers/topics/test01
[partitions]
[zk: 11.0.1.131:2181(CONNECTED) 2] ls /brokers/topics/test01/partitions
[0, 1, 2]
# partition结构
/brokers/topics/[topic]/partitions/[partitionId]/state
# broker信息
/brokers/ids/[o...N]
当需要读取kafka中的数据时,在服务器上查看比较麻烦,有时候数据量较大而且不是很直观。此时就需要一款简洁,使用方便的可视化工具了 — Offset Explorer
什么是 Offset Explorer?
Offset Explorer (之前称 Kafka Tool)是一款kafka的可视化工具,可以,查看kafka的topic ,partion数量,以及查看写入到kafa中的数据,整体页面非常简洁,使用起来也比较容易,他支持 mac ,windows,linux 服务器。
1)下载地址
我这里直接下载windows版
https://www.kafkatool.com/download.html
2)双击进行安装
3)安装完成后会让其添加kafka集群
4)输入kafka集群信息
1 集群名称,自定义
2 kafka集群地址(格式为host1:port1,host2:port2)
3 kafka版本
5)可以先测试能不能ping通
测试通以后直接连接即可
6)连接成功
7)查看创建的Topic、分区信息等
8)需要将数据格式改为文本,否则看到的数据是转成16进制的
9)查看数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。