赞
踩
自 0.9.0.0 版本开始,Kafka 正式引入了认证机制,用于实现基础的安全用户认证,这是将 Kafka 上云或进行多租户管理的必要步骤。截止到 2.3 版本,Kafka 支持基于 SSL 和基于 SASL 的安全认证机制。
基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。 通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么就要启用双路认证,也就是说 Broker 也要认证客户端的证书。
Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。 Kafka 支持的 SASL 机制有 5 种,它们分别是在不同版本中被引入的。
目前来看,使用 SSL 做信道加密的情况更多一些,但使用 SSL 实现认证不如使用 SASL。毕竟,SASL 能够支持选择不同的实现机制,如 GSSAPI、SCRAM、PLAIN 等。因此,建议使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。
SASL/GSSAPI 主要是给 Kerberos 使用的。GSSAPI 适用于本身已经做了 Kerberos 认证的场景,这样的话,SASL/GSSAPI 可以实现无缝集成。
SASL/PLAIN 是一个简单的用户名 / 密码认证机制,通常与 SSL 加密搭配使用。对于一些小公司而言,搭建公司级的 Kerberos 可能并没有什么必要,他们的用户系统也不复杂,特别是访问 Kafka 集群的用户可能不是很多。对于 SASL/PLAIN 而言,这就是一个非常合适的应用场景。总体来说,SASL/PLAIN 的配置和运维成本相对较小,适合于小型公司中的 Kafka 集群。
SASL/PLAIN 有这样一个弊端:它不能动态地增减认证用户,必须重启 Kafka 集群才能令变更生效。因为所有认证用户信息全部保存在静态文件中,所以只能重启 Broker,才能重新加载变更后的静态文件。
SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。
SASL/OAUTHBEARER 是 2.0 版本引入的新认证机制,主要是为了实现与 OAuth 2 框架的集成。 Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全的 JSON Web Token,必须配以 SSL 加密才能用在生产环境中。
Delegation Token 是在 1.1 版本引入的,它是一种轻量级的认证机制,主要目的是补充现有的 SASL 或 SSL 认证。 如果要使用 Delegation Token,需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样,Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。
测试环境是本地的两个 Broker 组成的 Kafka 集群,连接端口分别是 9092 和 9093。
第 1 步:创建认证用户
配置 SASL/SCRAM 的第一步,是创建能否连接 Kafka 集群的用户。创建 3 个用户,分别是 admin 用户、writer 用户和 reader 用户。admin 用户用于实现 Broker 间通信,writer 用户用于生产消息,reader 用户用于消费消息。
使用下面这 3 条命令,分别来创建它们:
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]'
--entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]'
--entity-type users --entity-name writer
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]'
--entity-type users --entity-name reader
使用下列命令来查看刚才创建的用户数据:
bin/kafka-configs.sh --zookeeper localhost:2181
--describe --entity-type users --entity-name writer
第 2 步:创建 broker 认证文件(JAAS 文件)
配置了用户之后,需要为每个 Broker 创建一个对应的 JAAS 文件。因为本例中的两个 Broker 实例是在一台机器上,所以只需创建一份 JAAS 文件。
在实际场景中,需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。
JAAS 的文件内容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
关于这个文件内容,需要注意以下两点:
接下来,配置 Broker 的 server.properties 文件,下面这些内容,是需要单独配置的:
# 表明开启 SCRAM 认证机制,并启用 SHA-256 算法
sasl.enabled.mechanisms=SCRAM-SHA-256
# Broker 间通信开启 SCRAM 认证,同样使用 SHA-256 算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# 表示 Broker 间通信不配置 SSL
security.inter.broker.protocol=SASL_PLAINTEXT
# 设置 listeners 使用 SASL_PLAINTEXT,不使用 SSL
listeners=SASL_PLAINTEXT://localhost:9092
第 3 步:启动 Broker
分别启动这两个 Broker。在启动时,需要指定 JAAS 文件的位置,如下所示:
$KAFKA_OPTS=-Djava.security.auth.login.config=<your_path>/kafka-broker.jaas bin/kafka-server-start.sh config/server1.properties
......
[2019-07-02 13:30:34,822] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:30:34,822] INFO Kafka startTimeMs: 1562045434820 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:30:34,823] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
$KAFKA_OPTS=-Djava.security.auth.login.config=<your_path>/kafka-broker.jaas bin/kafka-server-start.sh config/server2.properties
......
[2019-07-02 13:32:31,976] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:32:31,976] INFO Kafka startTimeMs: 1562045551973 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:32:31,978] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
第 4 步:(配置认证信息后)发送消息
在创建好测试主题之后,使用 kafka-console-producer 脚本来尝试发送消息。由于启用了认证,客户端需要做一些相应的配置。创建一个名为 producer.conf 的配置文件,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";
之后运行 Console Producer 程序:
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093
--topic test --producer.config <your_path>/producer.conf
>hello, world
>
第 5 步:(配置认证信息后)消费消息
使用 Console Consumer 程序来消费刚刚生产的消息。同样地,需要为 kafka-console-consumer 脚本创建一个名为 consumer.conf 的脚本,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";
之后运行 Console Consumer 程序:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093
--topic test --from-beginning --consumer.config <your_path>/consumer.conf
hello, world
第 6 步:动态增减用户
假设删除 writer 用户,同时又添加一个新用户:new_writer,执行的命令如下:
删除 writer 用户
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --delete-config 'SCRAM-SHA-256'
--entity-type users --entity-name writer
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --delete-config 'SCRAM-SHA-512'
--entity-type users --entity-name writer
添加一个新用户:new_writer
bin/kafka-configs.sh --zookeeper localhost:2181
--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=new_writer]'
--entity-type users --entity-name new_writer
修改 producer.conf 的内容,改为指定新创建的 new_writer 用户,即可发送消息:
$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093
--topic test --producer.config <your_path>/producer.conf
>Good!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。