赞
踩
kafka下载地址:https://kafka.apache.org/downloads
# 下载文件
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz
# 文件解压缩
tar -zxvf kafka_2.11-2.2.1.tgz
# 创建软连接
ln -s kafka_2.12-2.2.0 kafka
kafka服务的配置文件为 kafka/config/server.properties ,内容如下:
############################# Server Basics ############################# broker.id=0 ######################### Socket Server Settings ######################## listeners=PLAINTEXT://192.168.1.128:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ########################## Log Basics #################################### log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 #################### Internal Topic Settings ############################ offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ######################## Log Flush Policy ################################ log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 ################### Group Coordinator Settings ############################ group.initial.rebalance.delay.ms=0
配置说明:
单机模式需要修改 kafka/config/server.properties配置文件:
listener 配置项默认为注释状态,找到配置行,修改为本机IP及需要开启的端口
# 本机IP可省略
# listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.1.128:9092
log.dir 为kafka数据存储地址,修改配置项:
log.dirs=/usr/local/kafka/data
启动自带的zookeeper服务
# 后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka服务
# 后台启动
./bin/kafka-server-start.sh -daemon config/server.properties
# 创建topic,多个服务用逗号分隔
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 启动一个控制台消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
# 启动一个控制台生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
下载地址:https://zookeeper.apache.org/releases.html
配置文件说明:
# 心跳时间间隔 tickTime=2000 # 初始连接心跳书限制 initLimit=10 # 每次请求与应答间的心跳数限制 syncLimit=5 # 存储快照的目录, 不要使用/tmp用于存储, 这里只是一个示例 dataDir=/tmp/zookeeper # zookeeper服务端口 clientPort=2181 # 客户端连接的最大数目, 修改此选项以增加更多客户端 #maxClientCnxns=60 # 保存在dataDir中的快照数 #autopurge.snapRetainCount=3 # 清除任务间隔(小时), 设置为 “0” 以禁用自动清除功能 #autopurge.purgeInterval=1
准备三台服务器,并分别下载并解压zookeeper-3.4.12,三台服务器的IP如下:
拷贝并重命名配置文件:
cp zoo_sample.cfg zoo.cfg
在三台服务器的 zoo.cfg 配置文件末尾添加如下配置:
# 配置格式 server.id = ip:port:port
# 配置的id唯一,不可重复
# 2888端口为集群内机器通信使用
# 3888端口为leader选举使用
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888
在 dataDir 配置目录下新建myid文件,写入id的值:
# 此处仅为示例,dataDir不要使用/tmp目录
vim /tmp/zookeeper/myid
文件内容如下:
启动zookeeper时注意开放端口,centos 7.0 开放端口命令如下:
# --zone 作用域
# --add-port 添加端口,格式:端口/通讯协议
# --permanent 永久生效,无此参数时服务器重启后配置失效
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
# 配置完成后,重启防火墙
firewall-cmd --reload
# 查看防火墙已开放的端口
firewall-cmd --list-ports
使用命令 ./zkServer.sh start 分别启动三台服务器的zookeeper,集群启动完成后,三台服务器状态如下:
在三台服务器上,下载安装kafka,修改配置文件:
# broker.id,每个kafka配置不同的ID,唯一,不可重复
broker.id=0
# listeners 配置
listeners=PLAINTEXT://:9092
# zookeeper配置,多个地址用逗号分隔
zookeeper.connect=192.168.1.101:2181,192.168.1.103:2181,192.168.1.103:2181
配置完成后启动kafka
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
zookeeper 命令用于在 zookeeper 服务上执行操作,首先执行命令,打开新的 session 会话,进入终端。
$ ./zkCli.sh
进入终端后如下显示:
查看命令帮助:
命令说明:
命令 | 格式 | 说明 |
---|---|---|
ls | ls path [watch] | 查看路径下的目录列表 |
ls2 | ls2 path [watch] | 查看路径下的目录列表,比 ls 更详细 |
stat | stat path [watch] | 查看节点状态信息 |
set | set path data [version] | 修改节点存储的数据 |
get | get path [watch] | 获取节点数据和状态信息 |
create | create [-s] [-e] path data acl | 创建节点 |
delete | delete path [version] | 删除节点 |
rmr | rmr path | 递归删除文件夹 |
history | history | 查看执行的历史命令 |
quit | quit | 退出终端 |
将kafka/config目录下server.properties复制并重命名
cp server.properties server-sasl.properties
在配置文件末尾添加如下配置:
# 修改listeners
listeners=SASL_PLAINTEXT://:9092
# 权限配置
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
在 kafka/config目录下创建kafka-server-jaas.conf文件,内容如下:
# 配置文件的分号千万不能忘
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
复制 kafka/bin 目录下的 kafka-server-start.sh 并重命名
cp kafka-server-start.sh kafka-server-start-sasl.sh
打开文件,将最后一行:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改为(路径改为服务器的kafka路径):
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/user/kafka/config/kafka-server-jaas.conf kafka.Kafka "$@"
使用 zkCli.sh 命令进入zookeeper控制台,执行如下命令
# 创建用户
create /digest user
# 添加权限
addauth digest admin:admin
# 设置acl加密
setAcl /digest auth:admin:admin:crwda
./bin/kafka-server-start-sasl.sh config/server-sasl.properties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。