赞
踩
一、基本信息
1.机器列表
2.basedir: /cust/cig/{utils,logs,data,scripts}
jdk: /cust/cig/utils/jdk1.8.0_201
zookeeper: /cust/cig/utils/zookeeper-3.4.14
kafka: /cust/cig/utils/kafka_2.12-2.2.0
二、安装部署
1.安装jdk: 略
注意点:给java做软链接 否则kafka找不到java路径
# ln -s /cust/cig/utils/jdk1.8.0_201/bin/java /usr/bin/java
2.安装zookeeper
(1) 解压
# tar zxvf zookeeper-3.4.14.tar.gz
(2) 修改配置conf/zoo.cfg , 3 台机器内容一致
# cp conf/zoo_sample.cfg conf/zoo.cfg
# cat conf/zoo.cfg |grep -v ^#
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/cust/cig/data/zookeeper
clientPort=2181
server.1=192.168.19.201:2888:3888
server.2=192.168.19.202:2888:3888
server.3=192.168.19.203:2888:3888
(3) 编写myid文件, 文件目录在zoo.cfg配置定义的dataDir 下, 内容为上面标红部分
# echo 1 >/cust/cig/data/zookeeper/myid #在另外两台机器 分别为 echo 2 > .... echo 3 > ....
(4) 修改配置bin/zkEnv.sh, 定义日志路径
# cat bin/zkEnv.sh |grep -C 1 ZOO_LOG_DIR
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="/cust/cig/logs/zookeeper"
fi
(5) 启动
# zkServer.sh start #注意定义ZK环境变量
# zkServer.sh status #查看状态 集群为3个 其中1个为leader 2个为follow
# zkServer.sh stop #关闭
(6) 编写系统配置文件 使之开机自启
# cat /etc/systemd/system/zookeeper.service [Unit] Description=Zookeeper service After=network.target [Service] Type=forking ExecStart=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh start ExecStop=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh stop ExecReload=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh restart Restart=on-failure [Install] WantedBy=multi-user.target # systemctl enable zookeeper #设置为开机自启 # systemctl start zookeeper #启动
3.安装kafka: 使用SASL-SCRAM + ACL 安装, 使之能够动态添加用户和授权
(1) 解压
# tar zxvf kafka_2.12-2.2.0.tgz
(2) 添加用户admin 密码为admin # 在一台机器操作
# bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
# 查看admin信息
# bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --describe --entity-type users --entity-name admin
(3) 编写配置jaas文件 #3台机器都要有
# cat config/kafka-broker.jaas
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
(4) 修改配置文件bin/kafka-server-start.sh 在倒数第二行加一行
# tail -2 bin/kafka-server-start.sh
export KAFKA_OPTS=-Djava.security.auth.login.config="/cust/cig/utils/kafka_2.12-2.2.0/config/kafka-broker.jaas"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
(5) 修改配置文件
# cat config/server.properties |grep -v ^# |grep -v ^$ broker.id=1 #唯一标识符 每个机器不一样 ###################### SASL ######################### sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT listeners=SASL_PLAINTEXT://192.168.19.201:9092 advertised.listeners=SASL_PLAINTEXT://192.168.19.201:9092 ####################### ACL ######################## authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/cust/cig/data/kafka/ # 数据存储路径 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 auto.create.topics.enable=false #topic不能自动创建 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.io.buffer.size=524288000 zookeeper.connect=192.168.19.201:2181,192.168.19.202:2181,192.168.19.203:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=3000 ################# leader ################# auto.leader.rebalance.enable=false unclean.leader.election.enable=false ############# messages ################# message.max.bytes=5242880 replica.fetch.max.bytes=5242880 replication.factor=3 min.insync.replicas=2
(6) 启动
# ./bin/kafka-server-start.sh -daemon ./config/server.properties
(7) 设为开机自启
# cat /etc/systemd/system/kafka.service [Unit] Description=Apache Kafka server (broker) After=network.target zookeeper.service [Service] Type=simple ExecStart=/cust/cig/utils/kafka_2.12-2.2.0/bin/kafka-server-start.sh /cust/cig/utils/kafka_2.12-2.2.0/config/server.properties ExecStop=/cust/cig/utils/kafka_2.12-2.2.0/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target # kill -9 `ps aux|grep kafka|grep -v grep |awk '{print $2}'` #将刚才启动进程kill # systemctl enable kafka # systemctl start kafka
三、使用文档:命令集
由于添加了认证机制,在很多时候需要用到admin用户,所以需要添加admin用户的配置
# cat auth/admin.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
1.topic
(1) 添加
# ./bin/kafka-topics.sh --create --zookeeper 192.168.19.201:2181 --replication-factor 3 --partitions 3 --topic mytest command-config auth/admin.conf
(2) 删除
# ./bin/kafka-topics.sh --delete --zookeeper 192.168.19.201:2181 --topic mytest
(3) 查看
# ./bin/kafka-topics.sh --list --zookeeper 192.168.19.201:2181
# ./bin/kafka-topics.sh --describe --zookeeper 192.168.19.201:2181 –topic xxx
(4) 修改
## 修改分区数
# ./bin/kafka-topics.sh --alter --zookeeper 10.73.5.241:2181 --partitions xxx
## 修改副本数 太麻烦
2.用户
(1) 添加
# ./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name mytest
(2) 查看
# ./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --describe --entity-type users --entity-name mytest
(3) 更新
#./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=mytest]' --entity-type users --entity-name mytest
3.授权
(1) 读权限
# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 --add --allow-principal User:"mytest" --consumer --topic 'mytest' --group '*'
(2) 写权限
# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 --add --allow-principal User:"mytest" --producer --topic 'mytest'
(3) 查看
# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 –list
4.消息
(1) 生产
# ./bin/kafka-console-producer.sh --broker-list 10.73.5.241:9092 --topic mytest --producer.config auth/mytest.conf
(2) 消费
# ./bin/kafka-console-consumer.sh --bootstrap-server 10.73.5.241:9092 --topic mytest --consumer.config auth/mytest.conf
5.查看消息内容: 消息追踪
# /cust/utils/kafka_2.11-2.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /cust/data/kafka/mytest-0/00000000000000000000.log --print-data-log
6.offset
(1) 查看所有groupid
# bin/kafka-consumer-groups.sh --bootstrap-server 192.167.19.201:9092 --list --command-config auth/admin.conf
(2) 查看offset
# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.167.19.201:9092 --describe --group notification_email_sender_group --command-config auth/admin.conf
# 图片关键字解释
Topic : topic的名字
Partition : partition的ID
Current-Offset : kafka消费者在对应分区上已经消费的消息数【位置】
Log-End-Offset: 已经写到该分区的消息数【位置】
Lag : 还有多少消息未读取(Lag = Log-End-Offset– Current-Offset)
(3) 重置offset
# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.201:9092 --group ordersubmit --topic order_receiver_koala --execute --reset-offsets --to-offset 0 --command-config auth/admin.conf
--to-earliest 将每个分区的offset设置到最小(0)
--to-latest ………………………………………… 最大
四、其他
1.信息量过多时,可能会导致文件句柄数多大
broker.id=4 listeners=PLAINTEXT://192.168.43.101:9092 advertised.listeners=PLAINTEXT://192.168.43.101:9092 num.network.threads=8 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/alidata1/admin/data/kafka num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 transaction.state.log.min.isr=1 #log.retention.hours=48 ## 保留时间 2天 #log.retention.bytes=2*1073741824 log.retention.bytes=2147483648 log.segment.bytes=1073741824 log.cleanup.policy=delete log.retention.check.interval.ms=300000 zookeeper.connect=11.100.5.137:2181,11.100.5.138:2181,11.100.5.139:2181/prd zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=5000 num.replica.fetchers=2 replica.fetch.min.bytes=1 replica.fetch.max.bytes=5242880 replica.fetch.wait.max.ms=1000 replica.lag.time.max.ms=15000 message.max.bytes=2097152 default.replication.factor=2 auto.create.topics.enable=true delete.topic.enable=true max.incremental.fetch.session.cache.slots=10000 auto.leader.rebalance.enable=false unclean.leader.election.enable=false ## 解释:注意点在一下几个参数 # log.retention.hours=48 ## 消息保留2天 # log.retention.bytes=2147483648 ## 每个分区的最大文件大小, 大于2G 就删除旧的segment # log.segment.bytes=1073741824 ## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 # log.cleanup.policy=delete # 更详细的 参考:https://www.jianshu.com/p/c9a54a587f0e
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。