赞
踩
本篇文章继续给大家介绍ELFK日志分析,我们先前介绍了ELFK架构,zookeeper部署使用,kafka的部署,仅差kafka使用就将整个体系融汇贯通了。我们本篇文章将以kafka为核心,详细介绍kafka使用,最终将kafka融入ELFK架构中,大致内容见下面目录。
目录
kafka学习使用中涉及许多原理,了解这些原理会让学习事半功倍。
kafka cluster是分布式消息传递系统,与MQ cluster(消息队列)类似,由broker list(kafka运行的节点)和多个topic(主题)组成,在Kafka中,每个topic被细分为多个partition,而每个partition又可以被副本到多个kafka broker上实现高可用性。因此,kafka cluster是由多个broker节点和多个topic partition组成的。
producer是生产者角色,主要负责生产数据,是向kafka cluster写入数据的一方,数据的写入有两种常见策略,要么是rr算法,要么是基于key的hash值和分区数取余。
consumer是消费者角色,主要是负责消费数据,是从kafka cluster拉取数据的一方。
topic是主题,是数据存储的逻辑单元。
replica是副本,实际存储数据的地方,一个topic最少要有一个副本。
partition是分区,一个topic最少要有一个分区,正常情况下有多个分区编号的。副本是分区的实际载体。
consumer group是消费者组,一个消费者组里面最少有一个消费者,同一个消费者组的消费者不能同时消费同一个topic的partition,以免造成数据重复消费;当一个消费者组的消费者数据发生变化时,会触发rebalance(重平衡)机制,即重新分配分区消费。
ISR是和leader数据同步的所有副本集合
OSR是和leader数据不同步的所有副本集合
AR是ISR和OSR的集合,就是所有的副本集合
假如30秒内leader和follower数据的LEO(Log End Offset)一致,则认为数据一致,当follower的数据还没有与leader完全同步时,leader节点宕机了,此时,follower选举出新的leader,其他的follower会跟随这个leader数据继续工作,如果之前leader恢复了,会从之前的HW(高水位线,ISR中最后一个副本最小的LEO)开始重新写数据,与新的leader同步,之前follower没同步的就丢失了,若30秒内,副本没有和leader的LEO相同,会直接踢出ISR,进入OSR列表。
1、先启动zookeeper
- [root@ELK103 ~]# manager_zk.sh start
- 启动服务
- ========== elk101 zkServer.sh start ================
- Starting zookeeper ... STARTED
- ========== elk102 zkServer.sh start ================
- Starting zookeeper ... STARTED
- ========== elk103 zkServer.sh start ================
- Starting zookeeper ... STARTED
- [root@ELK103 ~]# manager_zk.sh status
- 查看状态
- ========== elk101 zkServer.sh status ================
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: follower
- ========== elk102 zkServer.sh status ================
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: leader
- ========== elk103 zkServer.sh status ================
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: follower
2、模仿zookeeper启动脚本写一个kafka启动脚本,并给予执行权限
- [root@ELK101 ~]# cat /usr/local/sbin/manager_kafka.sh
- #!/bin/bash
-
- #判断用户是否传参
- if [ $# -ne 1 ];then
- echo "无效参数,用法为: $0 {start|stop|restart|status}"
- exit
- fi
-
- #获取用户输入的命令
- cmd=$1
-
- #定义函数功能
- function kafkaManger(){
- case $cmd in
- start)
- echo "启动服务"
- remoteExecution start
- ;;
- stop)
- echo "停止服务"
- stopKafka stop
- ;;
- restart)
- echo "重启服务"
- remoteExecution restart
- ;;
- status)
- echo "查看状态"
- remoteExecution status
- ;;
- *)
- echo "无效参数,用法为: $0 {start|stop|restart|status}"
- ;;
- esac
- }
-
-
- #定义执行的命令
- function remoteExecution(){
- for (( i=101 ; i<=103 ; i++ )) ; do
- tput setaf 2
- echo ========== elk${i} kafaka-server-start.sh $1 ================
- tput setaf 9
- ssh elk${i} "kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
- done
- }
-
- function stopKafka(){
- for (( i=101 ; i<=103 ; i++ )) ; do
- tput setaf 2
- echo ========== elk${i} kafaka-server-stop.sh $1 ================
- tput setaf 9
- ssh elk${i} "kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties"
- done
-
- }
-
-
- #调用函数
- kafkaManger
- [root@ELK101 ~]# chmod +x /usr/local/sbin/manager_kafka.sh
- [root@ELK101 ~]# ll /usr/local/sbin/manager_kafka.sh
- -rwxr-xr-x 1 root root 1323 Jun 5 11:29 /usr/local/sbin/manager_kafka.sh
3、通过启动脚本启动kafka
- [root@ELK101 ~]# manager_kafka.sh start
- 启动服务
- ========== elk101 kafaka-server-start.sh start ================
- ========== elk102 kafaka-server-start.sh start ================
- ========== elk103 kafaka-server-start.sh start ================
1、增
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten
- Created topic koten. #创建一个名为koten的topic
-
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten-3 --partitions 3
- Created topic koten-3. #创建一个名为koten-3的topic,分区数为3
-
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten-3-2 --partitions 3 --replication-factor 2
- Created topic koten-3-2. #创建一个名为koten-3-2的topic,分区数为3,副本数为2
2、查
- #查看topic列表
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list
- koten
- koten-3
- koten-3-2
-
- #查看所有topic的详细信息
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe
- Topic: koten-3-2 TopicId: 1l4P-Tv_Q3aTasKediU3MQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
- Topic: koten-3-2 Partition: 0 Leader: 103 Replicas: 103,102 Isr: 103
- Topic: koten-3-2 Partition: 1 Leader: 102 Replicas: 102,101 Isr: 102,101
- Topic: koten-3-2 Partition: 2 Leader: 101 Replicas: 101,103 Isr: 101,103
- Topic: koten TopicId: eXxgjWBySxe_WAx-fv83ZA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
- Topic: koten Partition: 0 Leader: 103 Replicas: 103 Isr: 103
- Topic: koten-3 TopicId: l7L3SY63QV-ayXOD46bViQ PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
- Topic: koten-3 Partition: 0 Leader: 103 Replicas: 103 Isr: 103
- Topic: koten-3 Partition: 1 Leader: 102 Replicas: 102 Isr: 102
- Topic: koten-3 Partition: 2 Leader: 101 Replicas: 101 Isr: 101
-
- #查看指定topic的详细信息
- oot@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic koten
- Topic: koten TopicId: eXxgjWBySxe_WAx-fv83ZA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
- Topic: koten Partition: 0 Leader: 103 Replicas: 103 Isr: 103
3、改
主分片数可以用命令行直接修改,副本数修改比较麻烦,需要用json格式,可以参考这个大神的链接:https://www.cnblogs.com/yinzhengjie/p/9808125.html
- #修改koten的topic分区为5个
- [root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --alter --topic koten --partitions 5
4、删
[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --delete --topic koten
1、启动生产者
[root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic koten-3
2、启动消费者(在同一主机)
表示从最新的offset拉取数据
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic koten-3
表示从头开始拉取数据
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic koten-3 --from-beginning
1、查看现有的消费者组
- [root@ELK101 ~]# kafka-consumer-groups.sh --bootstraerver 10.0.0.101:9092 --list
- console-consumer-24702
- console-consumer-58734
- console-consumer-41114
2、列出所有消费者组的详细信息,包括偏移量,消费者ID,LEO等信息
- [root@ELK101 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups
-
- Consumer group 'console-consumer-24702' has no active members.
-
- Consumer group 'console-consumer-41114' has no active members.
-
- GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- console-consumer-58734 koten-3 0 - 1 - console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101 console-consumer
- console-consumer-58734 koten-3 1 - 2 - console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101 console-consumer
- console-consumer-58734 koten-3 2 - 3 - console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101 console-consumer
3、查看内置topic的offset数据,了解即可,我这边运行没有显示内容
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
4、基于配置文件指定消费组
- [root@ELK101 ~]# cat $KAFKA_HOME/config/consumer.properties
- ......
- group.id=koten-consumer-group
- ......
- [root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic koten-topic --consumer.config $KAFKA_HOME/config/consumer.properties
- [2023-06-05 20:25:33,829] WARN [Consumer clientId=console-consumer, groupId=koten-consumer-group] Error while fetching metadata with correlation id 2 : {koten-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
5、基于命令行参数指定消费者组
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic koten-topic --consumer-property group.id=koten-consumer-group
生产环境建议调到2G~4G
1、查zookeeper的堆内存大小
- [root@ELK101 ~]# jmap -heap `jps | awk '/QuorumPeerMain/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 1048576000 (1000.0MB)
- [root@ELK101 ~]#
2、修改堆内存的大小
- [root@ELK101 ~]# cat > /koten/softwares/apache-zookeeper-3.8.1-bin/conf/java.env <<'EOF'
- #!/bin/bash
- # 指定JDK的按住路径
- export JAVA_HOME=/koten/softwares/jdk1.8.0_291
-
- # 指定zookeeper的堆内存大小
- export JVMFLAGS="-Xms128m -Xmx128m $JVMFLAGS"
- EOF
3、将配置文件同步到集群的其他zk节点上
- [root@ELK101 ~]# data_rsync.sh /koten/softwares/apache-zookeeper-3.8.1-bin/conf/java.env
- ===== rsyncing elk102: java.env =====
- 命令执行成功!
- ===== rsyncing elk103: java.env =====
- 命令执行成功!
4、重启zk集群,注意一定要重启后堆内存才生效,manager_zk脚本不好用,有时候停止了,进程还存在,需要手动挨个运行zkServer.sh stop
- [root@ELK101 ~]# manager_zk.sh restart
- 重启服务
- ========== elk101 zkServer.sh restart ================
- Stopping zookeeper ... STOPPED
- Starting zookeeper ... STARTED
- ========== elk102 zkServer.sh restart ================
- Stopping zookeeper ... STOPPED
- Starting zookeeper ... STARTED
- ========== elk103 zkServer.sh restart ================
- Stopping zookeeper ... STOPPED
- Starting zookeeper ... STARTED
5、验证堆内存
- [root@ELK102 ~]# jmap -heap `jps | awk '/QuorumPeerMain/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 268435456 (256.0MB)
1、查看堆内存大小
- [root@ELK101 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 1073741824 (1024.0MB)
2、修改堆内存(生产环境5~6G最佳)
捎带启动了JXM端口
- [root@ELK101 ~]# cat `which kafka-server-start.sh`
- ......
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -
- XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
- -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:Initiat
- ingHeapOccupancyPercent=70"
- export JMX_PORT="8888"
- fi
- ......
3、单点重启kafka,查看堆内存大小
- [root@ELK101 ~]# kafka-server-stop.sh
- [root@ELK101 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
- [root@ELK101 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 268435456 (256.0MB)
4、将启动脚本同步到其他节点
- [root@ELK101 ~]# data_rsync.sh `which kafka-server-start.sh`
- ===== rsyncing elk102: kafka-server-start.sh =====
- 命令执行成功!
- ===== rsyncing elk103: kafka-server-start.sh =====
- 命令执行成功!
5、其他节点重启kafka环境,查看堆内存是否生效
- [root@ELK101 ~]# manager_kafka.sh stop
- 停止服务
- ========== elk101 kafaka-server-stop.sh stop ================
- ========== elk102 kafaka-server-stop.sh stop ================
- ========== elk103 kafaka-server-stop.sh stop ================
- [root@ELK101 ~]# manager_kafka.sh start
- 启动服务
- ========== elk101 kafaka-server-start.sh start ================
- ========== elk102 kafaka-server-start.sh start ================
- ========== elk103 kafaka-server-start.sh start ================
- [root@ELK102 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 268435456 (256.0MB)
- [root@ELK103 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
- MaxHeapSize = 268435456 (256.0MB)
图形化的方式管理kafka
1、启动kafka的JMX端口
与上面修改堆内存步骤相同,略。
2、启动zookeeper的JMX端口
- [root@ELK101 ~]# cat /koten/softwares/apache-zookeeper-3.8.1-bin/bin/zkEnv.sh
- # zookeeper JMX
- JMXLOCALONLY=false
- JMXHOSTNAME=10.0.0.101
- JMXPORT=9999
- JMXSSL=false
- JMXLOG4J=false
3、安装mysql,启动服务并设置开机自启动
- [root@ELK101 ~]# yum -y install mariadb-server
- [root@ELK101 ~]# cat /etc/my.cnf
- [mysqld]
- ......
- skip-name-resolve=1 #跳过名称解析,不跳过再进行登录的时候,可能会进行反向解析
- [root@ELK101 ~]# systemctl enable --now mariadb
- Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service.
4、创建数据库与授权用户
- [root@ELK101 ~]# mysql
- Welcome to the MariaDB monitor. Commands end with ; or \g.
- Your MariaDB connection id is 2
- Server version: 5.5.68-MariaDB MariaDB Server
-
- Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
-
- Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
-
- MariaDB [(none)]> CREATE DATABASE koten_kafka DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
- Query OK, 1 row affected (0.00 sec)
-
- MariaDB [(none)]> CREATE USER admin IDENTIFIED BY 'koten';
- Query OK, 0 rows affected (0.00 sec)
-
- MariaDB [(none)]> GRANT ALL ON koten_kafka.* TO admin;
- Query OK, 0 rows affected (0.00 sec)
-
- MariaDB [(none)]> SHOW GRANTS FOR admin;
- +------------------------------------------------------------------------------------------------------+
- | Grants for admin@% |
- +------------------------------------------------------------------------------------------------------+
- | GRANT USAGE ON *.* TO 'admin'@'%' IDENTIFIED BY PASSWORD '*87F5F6FF9376D7C33FEB4C2AA1F7F99E9853F2DB' |
- | GRANT ALL PRIVILEGES ON `koten_kafka`.* TO 'admin'@'%' |
- +------------------------------------------------------------------------------------------------------+
- 2 rows in set (0.00 sec)
-
- MariaDB [(none)]> quit
- Bye
5、测试用户
- [root@ELK101 ~]# mysql -u admin -pkoten -h 10.0.0.101
- Welcome to the MariaDB monitor. Commands end with ; or \g.
- Your MariaDB connection id is 3
- Server version: 5.5.68-MariaDB MariaDB Server
-
- Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
-
- Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
-
- MariaDB [(none)]> show databases;
- +--------------------+
- | Database |
- +--------------------+
- | information_schema |
- | koten_kafka |
- | test |
- +--------------------+
- 3 rows in set (0.00 sec)
-
- MariaDB [(none)]> quit
- Bye
1、下载kafka-eagle软件,下面的链接下载的慢可以用我分享在文末的链接
[root@ELK101 ~]# wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz
2、解压软件包
- [root@ELK101 ~]# unzip kafka-eagle-bin-2.0.8.zip
- Archive: kafka-eagle-bin-2.0.8.zip
- inflating: efak-web-2.0.8-bin.tar.gz
- inflating: system-config.properties
- [root@ELK101 ~]# tar xf efak-web-2.0.8-bin.tar.gz -C /koten/softwares/
3、修改配置文件
- [root@ELK101 ~]# cat /koten/softwares/efak-web-2.0.8/conf/system-config.properties
- efak.zk.cluster.alias=kafka
- kafka.zk.list=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/kafka-3.2.1 #注意chroot与kafka配置文件保持一致
- kafka.efak.broker.size=20
- kafka.zk.limit.size=16
- efak.webui.port=8048
- kafka.efak.offset.storage=zk
- kafka.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
- efak.metrics.charts=true
- efak.metrics.retain=15
- efak.sql.topic.records.max=5000
- efak.sql.topic.preview.records.max=10
- efak.topic.token=koten
- efak.driver=com.mysql.cj.jdbc.Driver
- efak.url=jdbc:mysql://10.0.0.101:3306/koten_kafka?useUnicode=true&characterEncodi
- ng=UTF-8&zeroDateTimeBehavior=convertToNull
- efak.username=admin
- efak.password=koten #数据库密码
4、配置环境变量
- [root@ELK101 ~]# cat >> /etc/profile.d/kafka.sh <<'EOF'
- export KE_HOME=/koten/softwares/efak-web-2.0.8
- export PATH=$PATH:$KE_HOME/bin
- EOF
- [root@ELK101 ~]# source /etc/profile.d/kafka.sh
5、修改配置启动脚本的堆内存大小
- [root@ELK101 ~]# sed -i '/KE_JAVA_OPTS/s#2g#256m#g' $KE_HOME/bin/ke.sh | grep KE_JAVA_OPTS
- [root@ELK101 ~]# grep KE_JAVA_OPTS $KE_HOME/bin/ke.sh
- export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
6、启动服务
- [root@ELK101 ~]# ke.sh start
- ......
- [2023-06-05 22:36:46] INFO: [Job done!]
- Welcome to
- ______ ______ ___ __ __
- / ____/ / ____/ / | / //_/
- / __/ / /_ / /| | / ,<
- / /___ / __/ / ___ | / /| |
- /_____/ /_/ /_/ |_|/_/ |_|
- ( Eagle For Apache Kafka® )
-
- Version 2.0.8 -- Copyright 2016-2021
- *******************************************************************
- * EFAK Service has started success.
- * Welcome, Now you can visit 'http://10.0.0.101:8048'
- * Account:admin ,Password:123456
- *******************************************************************
- * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
- * <Usage> https://www.kafka-eagle.org/ </Usage>
- *******************************************************************
7、登录eagle
8、忘记密码后可以进入数据库去找
- MariaDB [koten_kafka]> SELECT * FROM koten_kafka.ke_users;
- +----+-------+----------+-----------+-----------------+---------------+
- | id | rtxno | username | password | email | realname |
- +----+-------+----------+-----------+-----------------+---------------+
- | 1 | 1000 | admin | 123456 | admin@email.com | Administrator |
- +----+-------+----------+-----------+-----------------+---------------+
- 1 row in set (0.00 sec)
-
- MariaDB [koten_kafka]>
9、登陆进去后可以看到监控的kafka与zookeeper的一些数据信息
查看仪表盘
查看zookeeper与kafka的监控信息
10、不止可以监控数据,还可以创建topic,对kafka集群进行一些操作
对kafka集群进行压力测试,方便我们了解集群的处理上限,可以作为集群调优和扩容的依据。
测试之前要先搞懂链路,如果你的生产者与kafka集群不在一个地方,那么你在一个主机进行压力测试也没有意义,确定好链路后,在实际的生产者和消费者执行压测,修改对应的主机参数即可。
1、通过下面脚本进行压测
- mkdir /tmp/kafka-test
-
- cat > kafka-test.sh <<'EOF'
- # 创建topic
- kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-2023 --replication-factor 1 --partitions 10 --create
-
- # 启动消费者消费数据
- nohup kafka-consumer-perf-test.sh --broker-list 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-2023 --messages 100000 &>/tmp/kafka-test/kafka-consumer.log &
-
- # 启动生产者写入数据
- nohup kafka-producer-perf-test.sh --num-records 100000 --record-size 1000 --topic kafka-2023 --throughput 1000 --producer-props bootstrap.servers=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 &> /tmp/kafka-test/kafka-producer.log &
- EOF
-
- bash kafka-test.sh
-
-
- #可以根据自己要测的实际生产情况调整以下参数
-
- kafka-consumer-perf-test.sh
-
- ---messages: 指定消费消息的数量。
- --broker-list: 指定broker列表。
- --topic: 指定topic主体。
-
- kafka-producer-perf-test.sh
-
- -num-records:生产消息的数量。
- --record-size: 每条消息的大小,单位是字节。
- --topic: 指定topic主体。
- --throughput: 设置每秒发送的消息数量,即指定最大消息的吞吐量,若设置为-1代表不限制!
- --producer-props bootstrap.servers: 指定broker列表
2、可以通过efak查看实施进度
3、也可以通过脚本输出的日志观察生产和消费速度
filebeat对接kafka,如果filebeat作为生产者,kafka作为消费者,可以经过kafka后再写入到es集群;filebeat作为消费者,可以读取到kafka的数据,将kafka的数据展示出来,或者写入es集群。下面给大家展示下示例。
filebeat作为生产者,所以是output到kafka,input我们就用stdin去测试。
1、kafka启动消费者
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic filebeat-to-kafka
2、执行filebeat
- [root@ELK101 config]# cat 01-stdin-to-kafka.yaml
- filebeat.inputs:
- - type: stdin
-
- output.kafka:
- hosts: ["10.0.0.101:9092", "10.0.0.102:9092", "10.0.0.103:9092"]
- topic: 'filebeat-to-kafka'
-
- # 执行filebeat并输入测试数据
- [root@ELK101 config]# filebeat -e -c config/01-stdin-to-kafka.yaml
- ...
- 1234567
-
- # kafka会消费到数据
- [root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic filebeat-to-kafka
- [2023-12-18 15:51:33,774] WARN [Consumer clientId=console-consumer, groupId=console-consumer-60747] Error while fetching metadata with correlation id 2 : {filebeat-to-kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
- {"@timestamp":"2023-12-18T07:54:06.389Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.5"},"ecs":{"version":"1.12.0"},"log":{"offset":0,"file":{"path":""}},"message":"1234567","input":{"type":"stdin"},"host":{"name":"ELK101"},"agent":{"id":"8fa0a9d7-f6d8-45b8-9355-ddf800e337fa","name":"ELK101","type":"filebeat","version":"7.17.5","hostname":"ELK101","ephemeral_id":"cef5e36c-ef9b-4c38-91d9-54f7c4db48fe"}}
1、启动filebeat进行消费
- [root@ELK101 config]# cat 02-kafka-to-filebeat.yaml
- filebeat.inputs:
- - type: kafka
- hosts:
- - 10.0.0.101:9092
- - 10.0.0.102:9092
- - 10.0.0.103:9092
- topics: ["kafka-to-filebeat"]
- group_id: "filebeat"
-
- output.console:
- pretty: true
-
- [root@ELK101 config]# filebeat -e -c config/02-kafka-to-filebeat.yaml
2、启动kafka进行生产
- [root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-to-filebeat
- >123456ceshi
- >
-
-
- [root@ELK101 config]# filebeat -e -c config/02-kafka-to-filebeat.yaml
- ...
- {
- "@timestamp": "2023-12-18T09:06:52.226Z",
- "@metadata": {
- "beat": "filebeat",
- "type": "_doc",
- "version": "7.17.5"
- },
- "ecs": {
- "version": "1.12.0"
- },
- "host": {
- "name": "ELK101"
- },
- "agent": {
- "version": "7.17.5",
- "hostname": "ELK101",
- "ephemeral_id": "d4c29b42-d892-4532-bbe6-cff5f5a243f9",
- "id": "8fa0a9d7-f6d8-45b8-9355-ddf800e337fa",
- "name": "ELK101",
- "type": "filebeat"
- },
- "kafka": {
- "partition": 0,
- "offset": 0,
- "key": "",
- "headers": [],
- "topic": "kafka-to-filebeat"
- },
- "message": "123456ceshi",
- "input": {
- "type": "kafka"
- }
- }
1、kafaka开始消费
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic logstash-to-kafka
2、启动logstash开始生产
- [root@ELK101 logstash_cofig]# cat 01-logstash-to-kafka.yaml
- input {
- stdin {
- }
- }
-
- output {
- kafka {
- bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
- topic_id => "logstash-to-kafka"
- }
- }
-
- # 启动logstash并手动写入数据
- [root@ELK101 logstash_cofig]# logstash -r -f 01-logstash-to-kafka.yaml
- ...
- ceshi123
-
- # kafka可以消费到数据
- [root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic logstash-to-kafka
- ...
- 2023-12-18T08:04:25.969Z ELK101
- 2023-12-18T08:05:03.169Z ELK101 ceshi123
1、启动logstash进行消费
- [root@ELK101 logstash_cofig]# cat 02-kafka-to-logstasg.yaml
- input {
- kafka {
- bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
- topics => "kafka-to-logstash"
- }
- }
-
- output {
- stdout {}
- }
- [root@ELK101 logstash_cofig]# logstash -r -f 02-kafka-to-logstasg.yaml
2、启动kafka进行生产
- [root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-to-logstash
- >123
- >
-
- [root@ELK101 logstash_cofig]# logstash -r -f 02-kafka-to-logstasg.yaml
- ...
- {
- "@version" => "1",
- "@timestamp" => 2023-12-18T09:19:51.170Z,
- "message" => "123"
- }
kafka图形化管理工具下载链接:https://pan.baidu.com/s/1_xciM_6OC0383f11phRycQ?pwd=j7ki
我是koten,10年运维经验,持续分享运维干货,感谢大家的阅读和关注!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。