当前位置:   article > 正文

Apache zookeeper kafka 开启SASL安全认证 —— 筑梦之路_kafka sasl认证

kafka sasl认证

简介

  Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可以水平扩展和具有高吞吐量特性而被广泛使用,并与多类开源分布式处理系统进行集成使用。

  Kafka作为一款开源的、轻量级的、分布式、可分区和具备复制备份的、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统消息系统相比,Kafka能够更好的处理活跃的流数据,让数据在各个子系统中高性能、低延迟地不停流转。

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka集群的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此方案主要介绍SASL方式。

SASL验证分类
验证方式kafka版本特点
SASL/PLAIN0.10.0.0不能动态添加用户
SASL/SCRAM0.10.2.0可以动态添加用户
SASL/Kerberos0.9.0.0需要独立部署验证服务
SASL/oauthbearer2.0.0

需要自己实现接口,

实现token的创建和验证,

需要额外的oauth服务

 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据

SCRAM认证配置的优点:

  如果使用PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。

  因此,在生产环境中,这种认证方式不符合实际业务场景,不利于后期扩展。然而使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境推荐使用SCRAM+PLAIN搭配的认证方案。

 

配置zookeeper集群启用SASL

zookeeper官方地址:Apache ZooKeeper

1. 配置zookeeper,启用sasl认证

  1. cat zoo.cfg
  2. tickTime=2000
  3. initLimit=1
  4. syncLimit=5
  5. dataDir=/data/zookeeper/data
  6. dataLogDir=/data/zookeeper/datalog
  7. clientPort=2181
  8. admin.serverPort=8888
  9. maxClientCnxns=3000
  10. autopurge.snapRetainCount=3
  11. autopurge.purgeInterval=24
  12. server.1=192.168.100.110:2888:3888
  13. server.2=192.168.100.111:2888:3888
  14. server.3=192.168.100.112:2888:3888
  15. 4lw.commands.whitelist=conf,stat,srvr,mntr.envi
  16. #zk SASL
  17. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  18. jaasLoginRenew=3600000
  19. requireClientAuthScheme=sasl
  20. zookeeper.sasl.client=true

2. 配置zookeeper JAAS

  1. cat zk_jaas.conf
  2. Server {
  3. org.apache.zookeeper.server.auth.DigestLoginModule required
  4. username="admin"
  5. password="admin123"
  6. user_kafka="kafka123";
  7. };
  8. 注意:
  9. admin用户 是zk 集群之间使用的。
  10. kafka用户 是 broker 与 zk 之间使用的。

3. 修改zkEnv.sh

  1. 将上一步添加的 jaas 配置文件添加到zookeeper的环境变量中,zkEnv.sh文件最后添加一行:
  2. vim zkEnv.sh
  3. ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
  4. ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."
  5. # 添加如下 新增变量SERVER_JVMFLAGS:
  6. export SERVER_JVMFLAGS="-Djava.security.auth.login.config=../conf/zk_jaas.conf"

4. 配置环境变量,启动zk服务

  1. vim /etc/profile
  2. # zk
  3. export PATH=/home/zookeeper/bin:$PATH
  4. source /etc/profile
  5. # 启动服务
  6. zkServer.sh start && zkServer.sh status

5. 安装zkui图形界面管理zk

  1. # 拉取代码
  2. git clone https://github.com/DeemOpen/zkui.git
  3. # 安装maven并配置阿里云源
  4. 从这里下载:https://maven.apache.org/
  5. wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz
  6. tar -zxvf apache-maven-3.9.4-bin.tar.gz
  7. echo 'export PATH=/home/apache-maven-3.9.4-bin:$PATH' >>/etc/profile
  8. source /etc/profile
  9. mvn -v
  10. vim settings.xml
  11. 找到</mirrors>节点添加即可
  12. <mirror>
  13. <id>alimaven</id>
  14. <name>aliyun maven</name>
  15. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  16. <mirrorOf>central</mirrorOf>
  17. </mirror>
  18. -----------------------------------
  19. # 构建jar包
  20. cd zkui/ && mvn clean install -DskipTests
  21. mkdir /data/zkui
  22. cp zkui/target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar /data/zkui/
  23. cp zkui/config.cfg /data/zkui/
  24. cd /data/zkui
  25. nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
  26. tail -n 100 -f nohup.out
  27. -----------------------------------
  28. http://192.168.100.113:9090/login
  29. 账号:admin
  30. 密码:manager
  31. 基本操作
  32. 通过UI界面操作尽量规避掉人为操作的多种不确定性因素导致生产故障
  33. 浏览器访问,下面的用户名和密码提示是config.cfg中loginMessage字段写死的生产中修改掉即可

配置kafka sasl动态认证

  SASL/SCRAM认证是把凭证(credential)存储在Zookeeper,使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,所以在启动Kafka broker之前需要创建代理间通信的凭据。

  这里配置的 Kafka和生产者/消费者之间 采用SASL/PLAIN和SASL/SCRAM两种方式共同完成认证,授权使用ACL方式。PLAIN方式的用户是在jaas文件中写死的,不能动态的添加;SCRAM支持动态的添加用户。

1. 创建用户

  配置SASL/SCRAM认证的第一步,是配置可以连接到kafka集群的用户。本案例创建了3个用户:admin,producer,consumer。kafka_server_admin用户用于broker之间的认证通信,producer用户用于生产者连接kafka,consumer用户用于消费者连接kafka 。

  1. kafka-configs.sh --zookeeper 192.168.100.110:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name admin
  2. kafka-configs.sh --zookeeper 192.168.100.110:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name producer
  3. kafka-configs.sh --zookeeper 192.168.100.110:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name consumer
  1. Completed Updating config for entity: user-principal 'admin'. #正常情况打印信息这串信息。
  2. PS:用户 admin 这里配置admin用户用于实现broker间的通讯。

2. 查看创建的用户信息

kafka-configs 脚本是用来设置主题级别参数的。其实,它的功能还有很多。比如在这个例子中,我们使用它来创建 SASL/SCRAM 认证中的用户信息。可以使用下列命令来查看刚才创建的用户数据。

  1. kafka-configs.sh --zookeeper 192.168.100.110:2181 --describe --entity-type users
  2. #(可以单独指定某个用户 --entity-name producer,如下)
  3. kafka-configs.sh --zookeeper 192.168.100.110:2181 --describe --entity-type users --entity-name producer

 输出示例如下:

Configs for user-principal 'producer' are SCRAM-SHA-512=salt=czVldW1mNDRlcTgzdnJydWxrOXB0YThxbw==,stored_key=Hjrex55CEcLSA2b1n8bvL5CfjziLPHD8EoduoCkNrT0xcVDthoQSi4hvt7szU55pJP/LTbpQkZdV66nueVzKmg==,server_key=OcXwU1KyxBU1tUdlu9ikpfwl+y+ne121iD4amN7zKl8I84KeBZlrwz1IKB1ICFFiAP9XoRRj0QMgHbCfhsL/wA==,iterations=4096,SCRAM-SHA-256=salt=NDJzZmZ0MmhxMDI0N2ZrbXh6bTQwYnZzcA==,stored_key=EIzpjfD4JzItzjeh16UvpQEGyYoZbesR0GRsQB58Als=,server_key=5LAvxe/KjCas7w4GM+KlkT/il99peOozXY/HPZeSF14=,iterations=8192

这段命令包含了 writer 用户加密算法 SCRAM-SHA-256 以及 SCRAM-SHA-512 对应的盐值 (Salt)、ServerKey 和 StoreKey,这些都是 SCRAM 机制的术语。

  1. ZK客户端命令行查看:
  2. zkCli.sh -server 192.168.100.110:2181
  3. ls /config/users
  1. 删除用户[producer]的SCRAM证书:
  2. kafka-configs.sh --zookeeper 192.168.100.110:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer

3. 配置kafka jaas文件

   配置了用户之后,我们需要为 Broker 创建一个对应的 JAAS 文件。在实际场景中,需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。

Kafka 的 jaas认证配置文件,配置的是登录类,超管密码和管理的帐号密码列表

  1. vim kafka_server_jaas.conf
  2. KafkaServer {
  3. org.apache.kafka.common.security.scram.ScramLoginModule required
  4. username ="admin"
  5. password="admin123"
  6. user_admin="admin123"
  7. user_producer="producer123"
  8. user_consumer="consumer123";
  9. };
  10. KafkaClient {
  11. org.apache.kafka.common.security.scram.ScramLoginModule required
  12. username="admin"
  13. password="admin123"
  14. user_producer="producer123"
  15. user_consumer="consumer123";
  16. };
  17. Client {
  18. org.apache.kafka.common.security.scram.ScramLoginModule required
  19. username="kafka"
  20. password="kafka123";
  21. };
  22. -----------------------------------
  23. KafkaServer中usename配置的是kafka服务端使用的账号和密码,后面的user_xxx事预设的普通帐号认证信息。
  24. 中间部分配置的是PLAIN认证方式的账户和密码,其中producer1是账户名,producer123是密码。
  25. Client配置了broker到Zookeeper的连接用户名密码,这里要和前面zookeeper配置中的zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。
  26. 关于这个文件内容,需要注意以下两点:
  27. 1)不要忘记最后一行和倒数第二行结尾处的分号;
  28. 2)JAAS 文件中不需要任何空格键。

4. kafka 配置文件启用SASL认证

  1. Kafka 服务配置文件 server.propertis,配置认证协议及认证实现类
  2. # -----------------节点1
  3. cat server.properties
  4. broker.id=0
  5. listeners=SASL_PLAINTEXT://:9092
  6. advertised.listeners=SASL_PLAINTEXT://192.168.100.110:9092
  7. sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
  8. sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
  9. security.inter.broker.protocol=SASL_PLAINTEXT
  10. allow.everyone.if.no.acl.found=false
  11. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  12. super.users=User:admin
  13. num.network.threads=3
  14. num.io.threads=8
  15. socket.send.buffer.bytes=102400
  16. socket.receive.buffer.bytes=102400
  17. socket.request.max.bytes=104857600
  18. log.dirs=/data/kafka/datalogs
  19. num.partitions=3
  20. num.recovery.threads.per.data.dir=1
  21. offsets.topic.replication.factor=2
  22. transaction.state.log.replication.factor=1
  23. transaction.state.log.min.isr=1
  24. log.flush.interval.messages=10000
  25. log.flush.interval.ms=1000
  26. log.retention.hours=168
  27. log.retention.bytes=1073741824
  28. log.segment.bytes=1073741824
  29. log.retention.check.interval.ms=300000
  30. delete.topic.enable=true
  31. auto.create.topics.enable=false
  32. zookeeper.connect=192.168.100.110:2181,192.168.100.111:2181,192.168.100.112:2181
  33. zookeeper.connection.timeout.ms=60000
  34. group.initial.rebalance.delay.ms=0
  35. # -------------------节点2
  36. cat server.properties
  37. broker.id=1
  38. listeners=SASL_PLAINTEXT://:9092
  39. advertised.listeners=SASL_PLAINTEXT://192.168.100.111:9092
  40. sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
  41. sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
  42. security.inter.broker.protocol=SASL_PLAINTEXT
  43. allow.everyone.if.no.acl.found=false
  44. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  45. super.users=User:admin
  46. num.network.threads=3
  47. num.io.threads=8
  48. socket.send.buffer.bytes=102400
  49. socket.receive.buffer.bytes=102400
  50. socket.request.max.bytes=104857600
  51. log.dirs=/data/kafka/datalogs
  52. num.partitions=3
  53. num.recovery.threads.per.data.dir=1
  54. offsets.topic.replication.factor=2
  55. transaction.state.log.replication.factor=1
  56. transaction.state.log.min.isr=1
  57. log.flush.interval.messages=10000
  58. log.flush.interval.ms=1000
  59. log.retention.hours=168
  60. log.retention.bytes=1073741824
  61. log.segment.bytes=1073741824
  62. log.retention.check.interval.ms=300000
  63. delete.topic.enable=true
  64. auto.create.topics.enable=false
  65. zookeeper.connect=192.168.100.110:2181,192.168.100.111:2181,192.168.100.112:2181
  66. zookeeper.connection.timeout.ms=60000
  67. group.initial.rebalance.delay.ms=0
  68. # -----------------------节点3
  69. cat server.properties
  70. broker.id=2
  71. listeners=SASL_PLAINTEXT://:9092
  72. advertised.listeners=SASL_PLAINTEXT://192.168.100.112:9092
  73. sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
  74. sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
  75. security.inter.broker.protocol=SASL_PLAINTEXT
  76. allow.everyone.if.no.acl.found=false
  77. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  78. super.users=User:admin
  79. num.network.threads=3
  80. num.io.threads=8
  81. socket.send.buffer.bytes=102400
  82. socket.receive.buffer.bytes=102400
  83. socket.request.max.bytes=104857600
  84. log.dirs=/data/kafka/datalogs
  85. num.partitions=3
  86. num.recovery.threads.per.data.dir=1
  87. offsets.topic.replication.factor=3
  88. transaction.state.log.replication.factor=1
  89. transaction.state.log.min.isr=1
  90. log.flush.interval.messages=10000
  91. log.flush.interval.ms=1000
  92. log.retention.hours=168
  93. log.retention.bytes=1073741824
  94. log.segment.bytes=1073741824
  95. log.retention.check.interval.ms=300000
  96. delete.topic.enable=true
  97. auto.create.topics.enable=false
  98. zookeeper.connect=192.168.100.110:2181,192.168.100.111:2181,192.168.100.112:2181
  99. zookeeper.connection.timeout.ms=60000
  100. group.initial.rebalance.delay.ms=0

5. kafka 启动脚本添加认证文件路径的环境变量

Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置,修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向 jaas 配置文件的绝对路径

  1. vim kafka-server-start.sh
  2. #exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  3. exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/path-to-kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"

 6.  kafka客户端配置

1) 配置consumer.properties和producer.properties,都要加入以下配置

  1. ecurity.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=SCRAM-SHA-512

2) 生产者配置

使用kafka-console-producer.sh脚本测试生产者,由于开启安全认证和授权,此时使用console-producer脚本来尝试发送消息,那么消息会发送失败,原因是没有指定合法的认证用户,因此客户端需要做相应的配置,需要创建一个名为producer.conf的配置文件给producer程序使用

config目录下创建一个producer.conf的文件,文件内容如下:

  1. cat producer.conf
  2. security.protocol=SASL_PLAINTEXT
  3. sasl.mechanism=SCRAM-SHA-256
  4. sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
  5. mLoginModule required username="producer" password="producer123";

注意:Topic设置写权限

3) 消费者配置

使用kafka-console-consumer.sh脚本测试生产者,由于开启安全认证和授权,因此客户端需要做相应的配置。需要为 consumer 用户创建consumer.conf给消费者程序,同时设置对topic的读权限。

config目录下创建一个consumer.conf的文件,文件内容如下:

  1. vim consumer.conf
  2. security.protocol=SASL_PLAINTEXT
  3. sasl.mechanism=SCRAM-SHA-256
  4. sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
  5. mLoginModule required username="consumer" password="consumer123";

 注意:Topic设置读权限。

4) 在生产者和消费者启动脚本中引入JAAS文件

  1. vim bin/kafka-console-producer.sh
  2. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  3. export KAFKA_HEAP_OPTS="-Xmx512M"
  4. fi
  5. # 添加这行
  6. export KAFKA_OPTS="-Djava.security.auth.login.config=/path-to-kafka/config/kafka_server_jaas.conf"
  7. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
  8. # vim bin/kafka-console-consumer.sh
  9. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  10. export KAFKA_HEAP_OPTS="-Xmx512M"
  11. fi
  12. # 添加这行
  13. export KAFKA_OPTS="-Djava.security.auth.login.config=/path-to-kafka/config/kafka_server_jaas.conf"
  14. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

 启动kafka集群

nohup kafka-server-start.sh /path-to-kafka/config/server.properties &
  1. # 检查启动日志
  2. tail -n 100 -f nohup.out

检查验证

  1. zkCli.sh -server localhost:2181
  2. ls /brokers/ids

1. 生产者测试

1) 创建一个测试主题test_topic

kafka-topics.sh --zookeeper 192.168.100.112:2181 --create --replication-factor 3 --partitions 3 --topic test_topic

2)查看创建的Topic

  1. kafka-topics.sh --list --zookeeper 192.168.100.112:2181 test_topic
  2. kafka-topics.sh --describe --zookeeper 192.168.100.112:2181 --topic test_topic

注:kafka开启认证后,当生产者往Topic写数据需要为主题配置权限(write),即对生产者赋予写的权限。

这里使用producer用户认证授权,通过ACL为producer 用户分配操作test_topic权限(下面两个命令二选一即可):

  1. 分配写的权限
  2. kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.112:2181 --add --allow-
  3. principal User:producer --operation Write --topic 'test_topic'
  • 启动生产者发送消息

特别注意:在生产者生产消息之前如果不设置生产者用户的ACL权限会报错:

  1. kafka-console-producer.sh --broker-list 192.168.100.112:9092 --topic test_topic --producer.config /path-to-kafka/config/producer.conf
  2. >hello
  3. >world
  4. >end

2. 消费者测试

注:kafka开启认证后,当消费者拉取主题的消息时需要为Topic配置权限(read),即对生产者赋予写的权限。

这里使用consumer用户认证授权,通过ACL为consumer用户分配操作test_topic权限,同时需要对消费者组授权(给groupId配权),执行一下2个步骤:

  1. 1)分配读的权限
  2. kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.112:2181 --add --allow-principal User:consumer--operation Read --topic 'test_topic'
  3. 2)分配consumer的权限
  4. kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.112:2181 --add --allow-
  5. principal User:consumer--consumer --topic test_topic --group test_group

启动消费者消费消息:

读取消息前,如果对开启消费者用户读取topic权限,会报错:

  1. #kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.112:9092,192.xxx.xxx.110:9092,192.xxx.xxx.111:9092 --topic test --group test-group --from-beginning --consumer.config /path-to-kafka/config/consumer.conf
  2. kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.112:9092 --topic test_topic --group test-group --from-beginning --consumer.config /path-to-kafka/config/consumer.conf
  3. hello
  4. world
  5. end

案例一则

  1. 1)创建Topic
  2. kafka-topics.sh --zookeeper 192.168.100.110:2181 --create --partitions 2 --replication-factor 1 --topic test1
  3. 2)生产者端配置
  4. #添加写权限
  5. kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.100.110:2181 --add --allow-principal User:producer --operation Write --topic test1
  6. #写入消息
  7. kafka-console-producer.sh --broker-list 192.168.100.112:9092,192.168.100.111:9092,192.168.100.110:9092 --topic test1
  8. --producer.config /path-to-kafka/config/producer.conf
  9. >hello
  10. >world
  11. 3)消费者端配置
  12. #添加读权限
  13. kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-
  14. properties zookeeper.connect=192.168.100.110:2181 --add --allow-principal User:consumer --operation Read --topic test1
  15. #添加消费者组权限
  16. kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-
  17. properties zookeeper.connect=192.168.100.110:2181 --add --allow-principal User:consumer --operation Read --group test-group
  18. #拉取消息
  19. kafka-console-consumer.sh --bootstrap-server 192.168.100.112:9092,192.168.100.110:9092,192.168.100.111:9092
  20. --topic test1 --from-beginning --consumer.config
  21. /path-to-kafka/config/consumer.conf --group test-group
  22. hello
  23. World

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/590863
推荐阅读
相关标签
  

闽ICP备14008679号