赞
踩
Kafka是由scala所编写的一个消息中间件,因此安装Kafka需要依赖Zookeeper分布式协议、java开发环境以及Scala开发环境,而安装kafka-eagle依赖kafka和SiteDB或MySQL,因此安装kafka与kafka-eagle的步骤为:
安装配置java的开发环境
安装配置Scala的开发环境
安装配置Zookeeper集群
安装配置Kafka集群
安装配置MySQL或SiteDB
安装配置kafka-eagle
本文使用3台CentOS7的Linux节点进行配置,它们的主机和IP的映射如下:
10.10.10.1 host01
10.10.10.2 host02
10.10.10.3 host03
本文中上述组件的安装均在普通用户sxZhang
的账户上完成,安装路径为~/bigdata
。
因此在编辑环境变量和组件的配置文件时,读者应该根据自己的集群配置和安装路径,合理的修改相关信息。此外,本文假设读者已经安装配置过Java和scala、关闭了Linux节点的防火墙、配置了免密登录,并安装好了Zookeeper集群。
若读者未配置Zookeeper可查看笔者的另一篇博客:CentOS7安装Zookeeper
若读者防火墙和免密登录都未进行相应的配置,可以参考其它博主的博客:
免密登录可参考博客:Linux 配置SSH免密登录详细步骤
CentOS7关闭防火墙可参考:Centos7关闭防火墙详细教程
下载Kafka安装包
前往Apache Kafka下载页面,选择与自己操作系统和scala版本兼容的kafka安装包进行下载,之后使用xftp等拷贝到host01上。此处,笔者选择的Kafka安装包是kafka_2.12-3.5.2.tgz
。
解压Kafka安装包到目标路径并重命名
cd ~/bigdata/ #进入存放kafka安装包的目录下
tar -zvxf ./kafka_2.12-3.5.2.tgz #解压
mv kafka_2.12-3.5.2 kafka
修改kafka的相关配置文件
先在~/.bashrc
中添加kafka相关的环境变量:
export KAFKA_HOME=/home/sxZhang/bigdata/kafka
export PATH=$PATH:$KAFKA_HOME/bin
之后source ~/.bashrc
或新建ssh会话连接刷新环境变量,使用echo $KAFKA_HOME
能输出kafka的安装路径即成功。
修改完Kafka的环境变量后,我们还需要更新它的配置文件:
cd $KAFKA_HOME
mkdir kafka-logs #在kafka文件夹下新建一个名为kafka-logs的文件夹,后续指定为存放数据的文件夹
vim ./config/server.properties #编辑kafka的配置文件
使用vim编辑文件时,需要修改如下内容:
# broker.id的取值
broker.id=1
# 连接zookeeper的集群地址
zookeeper.connect=10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181
# kafka的监听端口
advertised.listeners=PLAINTEXT://10.10.10.1:9092
# kafka存储日志文件的文件夹
log.dirs=/home/sxZhang/bigdata/kafka/kafka-logs
需要注意的是,broker.id
在不同的节点上有不同的取值,例如在host01上的取值为1,在host01上的取值为3,建议该值与Zookeeper集群中的myid
一致;此外advertised.listeners
的IP也应该为当前节点的IP。
打包修改配置文件后的kafka文件夹分发到其它节点
# 1. 将修改配置文件后的kafka文件夹打包
tar -zcf kafka.tar.gz kafka
# 2. 将kafka压缩包发送到另外两个节点
scp kafka.tar.gz host01:~/bigdata/
scp kafka.tar.gz host02:~/bigdata/
按每个节点的myid
和IP修改配置文件
host02上需要修改的配置是:
# broker.id的取值
broker.id=2
# 连接zookeeper的集群地址
zookeeper.connect=10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181
# kafka的监听端口
advertised.listeners=PLAINTEXT://10.10.10.2:9092
host03上需要修改的配置是:
# broker.id的取值
broker.id=3
# 连接zookeeper的集群地址
zookeeper.connect=10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181
# kafka的监听端口
advertised.listeners=PLAINTEXT://10.10.10.3:9092
上述配置修改完成后,即可启动Kafka集群。
Kafka集群的启动与关闭
kafka集群在启动和关闭时,需要到每个节点上依次启动和关闭。
#1. 启动单个kafka集群的节点
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
#2. 停止单个kafka集群的节点
kafka-server-stop.sh
此时可以编写一个名为kafkacmd.sh
脚本用来批量启动和关闭集群中的Kafka进程:
#!/bin/bash cmd=$1 # 判断是否提供了输入参数 if [ -z "$1" ]; then echo "本脚本需要输入参数cmd" echo -e "cmd 在 {start, stop} 中选择:\n-start\t启动zk集群\n-stop\t关闭zk集群" exit 1 fi # 合法的命令集合 cmdlist=("start" "stop" "status") # 检查传入的命令是否合法 if [[ " ${cmdlist[@]} " =~ " ${cmd} " ]]; then # cmd合法,在zookeeper cluster上执行cmd for host in host{01..03} do echo 当前在${host}进行操作: if [ "$cmd" == "stop" ]; then echo "执行停止操作" ssh $host "kafka-server-stop.sh";echo elif [ "$cmd" == "start" ]; then echo "执行启动操作" ssh $host "kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties";echo else echo "未知命令: $cmd" fi done else # cmd不合法,在终端进行提示 echo "输入参数'$cmd'不合法,请从{start,stop}中选择命令" fi
编写完成后,执行chmod +x kafkacmd.sh
为该脚本添加可执行权限,之后可以拷贝到各节点中的KAFKA_HOME/bin
目录下,如果之前有正确的配置KAFkA_HOME
,并将其bin
目录成功的添加到了环境变量$PATH
中。
启动Kafka集群时执行:kafkacmd.sh start
需要注意的是:启动Kafka集群之前需要先启动Zookeeper集群
在启动Kafka集群后,遍历所有节点,使用jps
命令查看是否存在kafka守护进程判断集群是否启动成功:
for host in host{01..03}
do
echo 当前在${host}:
ssh $host "jps"; echo
done
若此集群启动成功,应该看到如下输出:
当前在host01:
129889 Kafka
129296 QuorumPeerMain
130985 Jps
当前在host02:
170723 Jps
169244 QuorumPeerMain
169820 Kafka
当前在host03:
10358 Kafka
9802 QuorumPeerMain
13564 Jps
关闭Kafka集群时只需执行:kafkacmd.sh stop
Kafka集群的相关操作
创建topic
kafka-topics.sh --bootstrap-server 10.10.10.1:9092 --create --topic test --partitions 16 --replication-factor 3
参数详解:
–bootstrap-server : kafka集群地址
–create :执行创创建操作
–topic :topic名称
–partitions :topic 的分区数
–replication-factor topic 的副本
启用Kafka自带的生产者
kafka-console-producer.sh --broker-list 10.10.10.1:9092,10.10.10.2:9092,10.10.10.3:9092 --topic test
参数详解:
–broker-list :存放topic相关数据的broker所在的kafka集群地址
在成功启用Kafka的生产者后,可以在命令行终端键入字符,之后敲击回车发送消息到对应的broker。
列出Kakfa服务器上存在的topic列表
kafka-topics.sh --list --bootstrap-server 10.10.10.1:9092,10.10.10.2:9092,10.10.10.3:9092
启用Kafka自带的消费者
kafka-console-consumer.sh --bootstrap-server 10.10.10.3:9092 --topic test --from-beginning
启用Kafka消费者后,会自动从topic对应的broker拉取消息,打印在命令行终端。
删除Kafka集群上指定的topic
kafka-topics.sh --bootstrap-server 10.10.10.1:9092,10.10.10.2:9092,10.10.10.3:9092 --delete --topic test
Kafka Eagle是一款用于监控和管理Apache Kafka的开源系统,它提供了完善的管理页面,例如Broker详情、性能指标趋势、Topic集合、消费者信息等。本文使用standalone部署模式在host01上对其进行安装部署。
下载Kafka-Eagle安装包
前往kafka-eagle的官方网站下载安装包,本文选取的版本是kafka-eagle-bin-3.0.1.tar
,下载该安装包后使用xftp工具传输到了~/bigdata
目录。
解压安装包并重命名
# 1. 初步解压 cd ~/bigdata tar -zvxf kafka-eagle-bin-3.0.1.tar.gz # 2. 进入解压后的目录 cd ./kafka-eagle-bin-3.0.1 ls #可以发现仅一个文件:efak-web-3.0.1-bin.tar.gz,还需要解压 # 3. 再次解压 tar -zvxf efak-web-3.0.1-bin.tar.gz ls #可以看到解压后的文件夹:efak-web-3.0.1 # 4. 重命名 mv efak-web-3.0.1 kafka-eagle # 5.移动到~/bigdata下 mv kafka-eagle ~/bigdata # 6.删除安装包和无用文件夹 rm -rf ~/bigdata/kafka-eagle-bin-3.0.1 rm ~/bigdata/kafka-eagle-bin-3.0.1.tar.gz
修改配置文件
先编辑~/.bashrc
修改环境变量,添加如下内容:
export KE_HOME=/home/sxZhang/bigdata/kafka-eagle
PATH=$PATH:$KE_HOME/bin
之后使用source ~/.bashrc
命令刷新环境变量。
另一个需要修改的配置文件是:$KE_HOME/conf/system-config.properties
cd $KE_HOME/conf
vim system-config.properties
使用上述命令进入kafka-eagle配置文件的编辑页面后,修改配置文件如下:
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here efak.zk.cluster.alias=cluster1 cluster1.zk.list=10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181 # Kafka broker nodes online list cluster1.efak.broker.size=10 cluster2.efak.broker.size=20 # Zkcli limit -- Zookeeper cluster allows the number of clients to connect to # If you enable distributed mode, you can set value to 4 or 8 kafka.zk.limit.size=8 # EFAK webui port -- WebConsole port access address efak.webui.port=8048 ###################################### # EFAK enable distributed ###################################### efak.distributed.enable=false # master worknode set status to master, other node set status to slave efak.cluster.mode.status=slave # deploy efak server address efak.worknode.master.host=localhost efak.worknode.port=8085 # Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option cluster1.efak.offset.storage=kafka # Whether the Kafka performance monitoring diagram is enabled efak.metrics.charts=false # EFAK keeps data for 30 days by default efak.metrics.retain=30 # If offset is out of range occurs, enable this property -- Only suitable for kafka sql efak.sql.fix.error=false efak.sql.topic.records.max=5000 # Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete efak.topic.token=keadmin # Kafka sasl authenticate cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; # If not set, the value can be empty cluster1.efak.sasl.client.id= # Add kafka cluster cgroups cluster1.efak.sasl.cgroup.enable=false # Default use sqlite to store data #efak.driver=org.sqlite.JDBC # It is important to note that the '/hadoop/kafka-eagle/db' path must be exist. #efak.url=jdbc:sqlite:/home/sxZhang/bigdata/kafka-eagle/db #efak.username=root #efak.password=smartloli # (Optional) set mysql address efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=MySQL5.7
在配置文件中,需要重点注意和修改的配置参数有:
efak.zk.cluster.alias
:该参数可以设置多个zk、kafka集群的别称,每个别称使用逗号分割。
cluster1.zk.list
:cluster1是efak.zk.cluster.alias
中指定的别称,zk.list
顾名思义,就是用来指定cluster1集群对应主机的IP和端口列表的。
efak.webui.port
:访问监控界面的WebUI时连接的端口,使用默认值即可。`
efak.driver
、efak.url
、efak.username
、efak.password
:这4个参数用于设置Kafka-Eagle
所用的底层数据库,可以使用Sqllite、Mysql、hadoop等进行存储,且只能选择一个,本文使用了Mysql进行存储。
假设读者没配置mysql,可尝试sqllite的数据库选项,如果尝试sqllite不太成功,可尝试配置mysql,配置mysql时开参考博主的另一篇博客:CentOS7使用yum从国内镜像源安装MySQL5.7-CSDN博客
启动Kafka-Eagle
执行如下命令:
cd $KE_HOME/bin/
chmod +x ke.sh
./ke.sh start
可以看到如下输出:
[2023-12-15 17:52:15] INFO: Port Progress: [##################################################] | 100% [2023-12-15 17:52:18] INFO: Config Progress: [##################################################] | 100% [2023-12-15 17:52:22] INFO: Startup Progress: [##################################################] | 100% [2023-12-15 17:52:11] INFO: Status Code[0] [2023-12-15 17:52:11] INFO: [Job done!] Welcome to ______ ______ ___ __ __ / ____/ / ____/ / | / //_/ / __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| | /_____/ /_/ /_/ |_|/_/ |_| ( Eagle For Apache Kafka® ) Version v3.0.1 -- Copyright 2016-2022 ******************************************************************* * EFAK Service has started success. * Welcome, Now you can visit 'http://10.10.10.1:8048' * Account:admin ,Password:123456 ******************************************************************* * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *******************************************************************
到WebUI查看是否启动成功
打开浏览器,输入网址http://10.10.10.1:8048
,进入登录界面
之后输入账号:amdin
,以及密码:123456
,进入Kafka-Eagle监控界面的主页。
可以在该监控页面看到集群的情况和硬件资源的使用情况。
至此,Kafka-Eagle安装成功。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。