赞
踩
1、kafka集群的安装配置依赖zookeeper,搭建kafka集群之前,需要搭建好zookeeper集群
2、需要有jdk环境
解压安装包
tar -zxf kafka_2.12-2.7.0.tgz -C /opt/module/
修改权限
sudo chown -R bigdata:bigdata /opt/module/kafka_2.12-2.7.0
创建数据存放目录和日志目录
mkdir -p /data/kafka/data
mkdir -p /data/log/kafka
修改配置
vi /opt/module/kafka_2.12-2.7.0/config/server.properties
broker.id=2 #broker的全局唯一编号不能重复,建议与zookeeper的myid对应 listeners=PLAINTEXT://$IP:9092 #broker监听ip和端口 0.0.0.0:9092则表示所有服务器均可以 advertised.listeners=PLAINTEXT://$IP:9092 #设置好才能被所有服务器监听 num.network.threads=3 #borker进行网络处理的线程数 num.io.threads=8 #borker进行I/O处理的线程数 socket.send.buffer.bytes=102400 #发送缓冲区大小,即发送消息先发送到缓冲区,当缓冲区满了在一起发出去 socket.receive.buffer.bytes=102400 #接收缓冲区大小,接收消息先放到接收缓冲区,当达到这个数量时同步到磁盘 socket.request.max.bytes=104857600 #向kafka套接字请求的最大字节数量,防止服务器outofmemory,大小最好不要超过java的堆栈大小 log.dirs=/data/kafka/data #消息存放目录,不是日志目录(数据日志分离) kafka.log4j.dir=/data/log/kafka #日志目录 num.partitions=3 #每个topic的默认分区数 num.recovery.threads.per.data.dir=1 #处理消息目录的线程数,若设置了3个消息路径,改参数为2,那么一共需要6个线程 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #消息过期时间,默认为1周 log.segment.bytes=1073741824 #日志文件中每个segment的大小,默认为1G,topic的分区是以一堆segment文件存储的,超过此限制会建立一个新的日志文件。此参数若在创建topic时的指定,那么参数覆盖,以指定的为准 log.retention.check.interval.ms=300000 #如上设置了每个segment文件大小为1G,那么此时间间隔就是检查他的大小有没有达到1G,检查的时间间隔 zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka #后面带上kafka,kafka在zookeeper上的配置都会在/kafka目录下 zookeeper.connection.timeout.ms=18000 #默认6000 group.initial.rebalance.delay.ms=0
其中kafka相关线程配置可根据实际主机cpu情况做调整
num.network.threads=3
num.io.threads=8
num.replica.fetchers=1
配置文件解释:
log.dirs:
Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
num.recovery.threads.per.data.dir:
对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:
• 服务器正常启动,用于打开每个分区的日志片段;
• 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
• 服务器正常关闭,用于关闭日志片段
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的
服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。
修改Kafka日志输出位置
log4j.properties与server.properties配置kafka.log4j.dir不生效
需调整./bin/kafka-run-class.sh
vim /opt/module/kafka_2.12-2.7.0/bin/kafka-run-class.sh
# LOG_DIR="$base_dir/logs"
LOG_DIR="/data/log/kafka"
分发到其他节点并修改/opt/module/kafka_2.12-2.7.0/config/server.properties 配置文件中的broker.id
bin目录下启动kafka
kafka-server-start.sh -daemon./config/server.properties
使用jps可以看到kafka进程
在node2创建kafka.service文件
sudo vim /usr/lib/systemd/system/kafka.service
[Unit] Description=Kafka Requires=zookeeper.service After=network.target zookeeper.service [Service] Type=simple Environment=JAVA_HOME=/usr/java/jdk1.8.0_212 ExecStart=/opt/module/kafka_2.12-2.7.0/bin/kafka-server-start.sh /opt/module/kafka_2.12-2.7.0config/server.properties ExecStop=/opt/module/kafka_2.12-2.7.0/bin/kafka-server-stop.sh User=bigdata Group=bigdata Restart=on-failure SuccessExitStatus=0 143 [Install] WantedBy=multi-user.target
分发到其他节点
启动、关闭验证是否配置成功
sudo systemctl start kafka
sudo systemctl stop kafka
sudo systemctl status kafka
#成功后关闭开机自启
sudo systemctl disable kafka
在node1创建一个集群脚本
vim /opt/bash/kafka.sh
#!/bin/bash # kafka节点列表 # 在这里修改为自己的实际节点IP地址 nodes=(node2 node3 node4) # 循环遍历所有节点执行相同的命令 cmd=$1 if [ "$cmd" != "" ];then if [[ "start" == "$cmd" ]] || [[ "stop" == "$cmd" ]] || [[ "restart" == "$cmd" ]] || [[ "status" == "$cmd" ]] ;then for host in ${nodes[@]} ; do echo "=================== $host ==================" ssh $host "sudo systemctl $cmd kafka" echo "$cmd Kafka on $host success..." done else echo " 输入的参数不对" echo " start 启动kafka集群" echo " stop 停止kafka集群" echo " restart 重启kafka集群" echo " status 查看kafka集群" fi else echo "请传入一个参数(start|stop|restart|status)" fi
添加执行权限
chmod +x /opt/bash/kafka.sh
脚本使用
/opt/bash/kafka.sh start/stop/status/restart
创建一个topic,3个分区,3个副本
./bin/kafka-topics.sh --create --zookeeeper node2:2181,node3:2181,node4:2181/kafka -replication-factor 3 --partitions 3 --topic test
查看topic列表
./bin/kafka-topics.sh --list --zookeeper node2:2181,node3:2181,node4:2181/kafka
用一台服务器模拟生产者,发布消息
./bin/kafka-console-producer.sh --broker-list node3:9092 --topic test
用另一台服务器模拟消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --topic test --from-beginning
注:–from-beginning 表示从头接收,否则只能从当前的offset接收新消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。