赞
踩
Kafka安装前提,需要有Java环境以及zookeeper集群。
三台服务器地址:
192.168.11.x1 alary001
192.168.11.x2 alary002
192.168.11.x3 alary003
第一次学习时,还在网上找各种下载地址以及去官网下载,发现各种麻烦和不靠谱,后来慢慢学会,
可以通过国内的各个镜像站进行下载,
清华大学开源软件镜像站——Apach下相关软件
找到Kafka,下载对应版本即可。
下载好后,将kafka的tar.gz安装包通过rz kafka-xxxx.tar.gz文件传至服务器。
进入到kafka的kakfa-xxx.tar.gz的目录下,解压文件:
tar -zxvf kafka-xxx.tar.gz
可以将kafka-xxx进行更名:
mv kafka-xxx kafka
进入 /etc/profile配置环节变量
vi /etc/profile
# set environment virables
export KAFKA_HOME=/home/hadoop/app/kafka
export PATH=$KAFKA_HOME/bin:$PATH
重启环境变量配置信息:
source /etc/profile
进入kafka配置文件目录:
cd /home/hadoop/kafka/config/
通过ll或者ls查看目录下的文件。
首先,我们来修改server.properties这个配置文件
配置内容如下:
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.1.32.85:9092 #kafka监听地址
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/tmp/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=10.1.32.85:2181,10.1.32.193:2181,10.1.32.161:2181 #设置zookeeper的连接端口
zookeeper.connection.timeout.ms=6000 #连接zookeeper的超时时间
之前第一个配置时,遇到一个坑,直接在配置后面加注释,如:
zookeeper.connection.timeout.ms=6000 #连接zookeeper的超时时间
在解析配置文件时,会报错无法识别后面的汉字等内容,所以讲所有注释全部换行 注释,如下:
zookeeper.connection.timeout.ms=6000
#连接zookeeper的超时时间
即可解决问题;
server.propertis配置:
broker.id=1
#每台服务器的broker.id都不能相同
listeners=PLAINTEXT://192.19.11.x1:9092
#在log.retention.hours=168下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
zookeeper.connect=alary001:2181,alary002:2181,alary003:2181
#设置zookeeper的连接端口
此处,要想直接通过hostname来访问,需要配置host信息。
vi /etc/hosts
192.168.11.x1 alary001
192.168.11.x2 alary002
192.168.11.x3 alary003
配置完第一台机器alary001后,通过scp 将第一台机器的kakfa分发给其他机器,并修改相关内容。
启动kafka必须三台都启动,如果是通过xshell进行练习时,可以通过在xshell中鼠标右键,发送键输入到所有会话,即可同时启动kafka。
kafka启动命令:
kafka-server-start.sh -daemon /home/hadoop/kafka/config/server.properties
通过jps命令来检查服务是否启动:
kafka-topics.sh --create --zookeeper alary001:2181 --replication-factor 2 --partitions 1 --topic lujin
#--replication-factor 2
#复制两份
#--partitions 1
#创建1个分区
# --topic
#主题名称
在kafka的发布机器(alary001)创建一个broker:
kafka-console-producer.sh --broker-list alary001:9092 --topic lujin
另外的其他机器上,创建订阅者:
kafka-console-consumer.sh --zookeeper alary002:2181 --topic lujin --from-beginning
若通过发布者发送的消息,订阅者成功收到,则kafka集群搭建完毕!
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
查询集群描述
bin/kafka-topics.sh --describe --zookeeper
消费者列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
显示某个消费组的消费详情(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。