当前位置:   article > 正文

kafka sasl_scram认证+ACL动态增加用户授权_kafka 再新增用户

kafka 再新增用户

一、基本信息
1.机器列表
在这里插入图片描述
2.basedir: /cust/cig/{utils,logs,data,scripts}
jdk: /cust/cig/utils/jdk1.8.0_201
zookeeper: /cust/cig/utils/zookeeper-3.4.14
kafka: /cust/cig/utils/kafka_2.12-2.2.0

二、安装部署
1.安装jdk: 略
注意点:给java做软链接 否则kafka找不到java路径

# ln -s /cust/cig/utils/jdk1.8.0_201/bin/java /usr/bin/java
  • 1

2.安装zookeeper
(1) 解压

# tar zxvf zookeeper-3.4.14.tar.gz
  • 1

(2) 修改配置conf/zoo.cfg , 3 台机器内容一致

# cp conf/zoo_sample.cfg conf/zoo.cfg
# cat conf/zoo.cfg |grep -v ^#
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/cust/cig/data/zookeeper
clientPort=2181
server.1=192.168.19.201:2888:3888
server.2=192.168.19.202:2888:3888
server.3=192.168.19.203:2888:3888
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(3) 编写myid文件, 文件目录在zoo.cfg配置定义的dataDir 下, 内容为上面标红部分

# echo 1 >/cust/cig/data/zookeeper/myid	#在另外两台机器 分别为 echo 2 > ....  echo 3 > ....
  • 1

(4) 修改配置bin/zkEnv.sh, 定义日志路径

# cat bin/zkEnv.sh |grep  -C 1 ZOO_LOG_DIR

if [ "x${ZOO_LOG_DIR}" = "x" ]
then
    ZOO_LOG_DIR="/cust/cig/logs/zookeeper"
fi
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(5) 启动

# zkServer.sh start		#注意定义ZK环境变量
# zkServer.sh status	#查看状态 集群为3个 其中1个为leader 2个为follow
# zkServer.sh stop		#关闭
  • 1
  • 2
  • 3

(6) 编写系统配置文件 使之开机自启

# cat /etc/systemd/system/zookeeper.service 
[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=forking
ExecStart=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh start
ExecStop=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh stop 
ExecReload=/cust/cig/utils/zookeeper-3.4.14/bin/zkServer.sh restart
Restart=on-failure

[Install]
WantedBy=multi-user.target


# systemctl enable zookeeper	#设置为开机自启
# systemctl start zookeeper		#启动
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3.安装kafka: 使用SASL-SCRAM + ACL 安装, 使之能够动态添加用户和授权
(1) 解压

# tar zxvf kafka_2.12-2.2.0.tgz
  • 1

(2) 添加用户admin 密码为admin # 在一台机器操作

# bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

# 查看admin信息
# bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --describe --entity-type users  --entity-name admin
  • 1
  • 2
  • 3
  • 4

(3) 编写配置jaas文件 #3台机器都要有

# cat config/kafka-broker.jaas
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(4) 修改配置文件bin/kafka-server-start.sh 在倒数第二行加一行

# tail -2 bin/kafka-server-start.sh
export KAFKA_OPTS=-Djava.security.auth.login.config="/cust/cig/utils/kafka_2.12-2.2.0/config/kafka-broker.jaas"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  • 1
  • 2
  • 3

(5) 修改配置文件

# cat config/server.properties |grep -v ^# |grep -v ^$

broker.id=1		#唯一标识符 每个机器不一样
###################### SASL #########################
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.19.201:9092
advertised.listeners=SASL_PLAINTEXT://192.168.19.201:9092
####################### ACL ########################
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/cust/cig/data/kafka/      # 数据存储路径
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
auto.create.topics.enable=false		#topic不能自动创建
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.io.buffer.size=524288000
zookeeper.connect=192.168.19.201:2181,192.168.19.202:2181,192.168.19.203:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000

################# leader #################
auto.leader.rebalance.enable=false
unclean.leader.election.enable=false

############# messages #################
message.max.bytes=5242880
replica.fetch.max.bytes=5242880
replication.factor=3
min.insync.replicas=2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

(6) 启动

# ./bin/kafka-server-start.sh -daemon ./config/server.properties
  • 1

(7) 设为开机自启

# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
ExecStart=/cust/cig/utils/kafka_2.12-2.2.0/bin/kafka-server-start.sh /cust/cig/utils/kafka_2.12-2.2.0/config/server.properties
ExecStop=/cust/cig/utils/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

# kill -9 `ps aux|grep kafka|grep -v grep |awk '{print $2}'`		#将刚才启动进程kill
# systemctl enable kafka
# systemctl start kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

三、使用文档:命令集
由于添加了认证机制,在很多时候需要用到admin用户,所以需要添加admin用户的配置

# cat auth/admin.conf 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
  • 1
  • 2
  • 3
  • 4

1.topic
(1) 添加

# ./bin/kafka-topics.sh --create --zookeeper 192.168.19.201:2181 --replication-factor 3 --partitions 3 --topic mytest command-config auth/admin.conf
  • 1

(2) 删除

# ./bin/kafka-topics.sh --delete --zookeeper 192.168.19.201:2181 --topic mytest
  • 1

(3) 查看

# ./bin/kafka-topics.sh --list --zookeeper 192.168.19.201:2181
# ./bin/kafka-topics.sh --describe --zookeeper 192.168.19.201:2181 –topic xxx
  • 1
  • 2

(4) 修改

## 修改分区数
# ./bin/kafka-topics.sh --alter --zookeeper 10.73.5.241:2181 --partitions xxx

## 修改副本数   太麻烦
  • 1
  • 2
  • 3
  • 4

2.用户

(1) 添加

# ./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name mytest
  • 1

(2) 查看

# ./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --describe --entity-type users  --entity-name mytest
  • 1

(3) 更新

#./bin/kafka-configs.sh --zookeeper 192.168.19.201:2181 --alter --add-config 'SCRAM-SHA-256=[password=mytest]' --entity-type users --entity-name mytest
  • 1

3.授权

(1) 读权限

# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 --add --allow-principal User:"mytest" --consumer --topic 'mytest' --group '*'
  • 1

(2) 写权限

# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 --add --allow-principal User:"mytest" --producer --topic 'mytest'
  • 1

(3) 查看

# ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.19.201:2181 –list
  • 1

4.消息

(1) 生产

# ./bin/kafka-console-producer.sh --broker-list 10.73.5.241:9092 --topic mytest --producer.config auth/mytest.conf
  • 1

(2) 消费

# ./bin/kafka-console-consumer.sh --bootstrap-server 10.73.5.241:9092 --topic mytest --consumer.config auth/mytest.conf
  • 1

5.查看消息内容: 消息追踪

# /cust/utils/kafka_2.11-2.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments  --files /cust/data/kafka/mytest-0/00000000000000000000.log --print-data-log
  • 1

6.offset

(1) 查看所有groupid

# bin/kafka-consumer-groups.sh --bootstrap-server  192.167.19.201:9092 --list  --command-config auth/admin.conf
  • 1

(2) 查看offset

# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.167.19.201:9092 --describe --group  notification_email_sender_group --command-config auth/admin.conf
  • 1

在这里插入图片描述

# 图片关键字解释
Topic : topic的名字
Partition : partition的ID
Current-Offset : kafka消费者在对应分区上已经消费的消息数【位置】
Log-End-Offset: 已经写到该分区的消息数【位置】
Lag : 还有多少消息未读取(Lag = Log-End-Offset– Current-Offset)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(3) 重置offset

# bin/kafka-consumer-groups.sh --bootstrap-server  192.168.19.201:9092 --group ordersubmit --topic order_receiver_koala --execute --reset-offsets --to-offset 0 --command-config auth/admin.conf
--to-earliest 将每个分区的offset设置到最小(0)
--to-latest   ………………………………………… 最大
  • 1
  • 2
  • 3

四、其他
1.信息量过多时,可能会导致文件句柄数多大

broker.id=4
listeners=PLAINTEXT://192.168.43.101:9092
advertised.listeners=PLAINTEXT://192.168.43.101:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/alidata1/admin/data/kafka
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
#log.retention.hours=48         ## 保留时间 2天
#log.retention.bytes=2*1073741824
log.retention.bytes=2147483648
log.segment.bytes=1073741824
log.cleanup.policy=delete
log.retention.check.interval.ms=300000
zookeeper.connect=11.100.5.137:2181,11.100.5.138:2181,11.100.5.139:2181/prd
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=5000
num.replica.fetchers=2
replica.fetch.min.bytes=1
replica.fetch.max.bytes=5242880
replica.fetch.wait.max.ms=1000
replica.lag.time.max.ms=15000
message.max.bytes=2097152
default.replication.factor=2
auto.create.topics.enable=true
delete.topic.enable=true
max.incremental.fetch.session.cache.slots=10000
auto.leader.rebalance.enable=false
unclean.leader.election.enable=false

## 解释:注意点在一下几个参数
# log.retention.hours=48         ## 消息保留2天
# log.retention.bytes=2147483648   ## 每个分区的最大文件大小, 大于2G 就删除旧的segment
# log.segment.bytes=1073741824   ##  topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
# log.cleanup.policy=delete
# 更详细的 参考:https://www.jianshu.com/p/c9a54a587f0e
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号