赞
踩
[zhang@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
修改名称
[zhang@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
创建logs文件夹
[zhang@hadoop102 kafka]$ mkdir logs
修改配置文件
[zhang@hadoop102 kafka]$ cd config/
[zhang@hadoop102 config]$ vi server.properties
修改或者增加以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/data
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
[zhang@hadoop102 config]$ cd ..
[zhang@hadoop102 kafka]$ cd ..
[zhang@hadoop102 module]$ ll
分发
[zhang@hadoop102 module]$ xsync kafka/
去103 104上修改broker.id
配置环境变量
[zhang@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh
[zhang@hadoop102 kafka]$ source /etc/profile.d/my_env.sh
分发
[zhang@hadoop102 kafka]$ sudo /home/zhang/bin/xsync /etc/profile.d/my_env.sh
[zhang@hadoop103 config]$ source /etc/profile.d/my_env.sh
[zhang@hadoop104 config]$ source /etc/profile.d/my_env.sh
启动集群
[zhang@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[zhang@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[zhang@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
在/home/zhang/bin目录下创建脚本kf.sh
[zhang@hadoop102 bin]$ vim kf.sh
添加脚本内容
- #! /bin/bash
-
- case $1 in
- "start"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------启动 $i Kafka-------"
- ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
- done
- };;
- "stop"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------停止 $i Kafka-------"
- ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
- done
- };;
- esac
[zhang@hadoop102 bin]$ chmod 777 kf.sh
使用脚本命令关闭
启动命令也成功
查看Kafka Topic列表
[zhang@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
创建Kafka Topic
进入到/opt/module/kafka/目录下创建日志主题
[zhang@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_log
删除Kafka Topic
[zhang@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_log
Kafka生产消息
[zhang@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_log
>hello world
>zhang zhang
Kafka消费消息
[zhang@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log
hadoop102、hadoop103、hadoop104的网络带宽都设置为100mbps。
关闭hadoop102主机,并根据hadoop102克隆出hadoop105(修改IP和主机名称)
hadoop105的带宽不设限。
关闭集群:一定要先关闭Kafka,再关闭zk,如果先关闭zk,就会关不掉Kafka。
hadoop105连接上xshell
启动hadoop102
设置105网络不受限
创建一个test topic,设置为3个分区2个副本
[zhang@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 2 --partitions 3 --topic test
测试
[zhang@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 10000000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
record-size是一条信息有多大,单位是字节。
num-records是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。
batch.size默认值是16k。
batch.size较小,会降低吞吐量。比如说,批次大小为0则完全禁用批处理,会一条一条发送消息);
batch.size过大,会增加消息发送延迟。比如说,Batch设置为64k,但是要等待5秒钟Batch才凑满了64k,才能发送出去。那这条消息的延迟就是5秒钟。
总结
同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去
Kafka需要考虑高吞吐量与延时的平衡。
[zhang@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
--broker-list指定Kafka集群地址
--topic 指定topic的名称
--fetch-size 指定每次fetch的数据的大小
--messages 总共要消费的消息个数
增加fetch-size值,观察消费吞吐量。
总结
吞吐量受网络带宽和fetch-size的影响
项目经验值Kafka分区数计算
分区数一般设置为:3-10个
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。