赞
踩
kafka的版本为2.12-2.8.0
新建kafka_server_jaas.conf文件,该文件为kafka服务使用。
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- serviceName="kafka"
- username="admin"
- password="admin123"
- user_admin="admin123"
- user_client="123456";
- };
serviceName:定义服务的名称
username:定义内部连接使用的用户名
password:定义内部连接使用的用户名对应的密码。
user_admin:定义用户名和密码,根据上面配置来看,user_admin中admin为用户名,后面的admin123为密码。
user_client:定义用户名和密码,根据上面配置来看,user_client中client为用户名,后面的123456为密码。
新建kafka_client_jaas.conf文件,该文件为客户端使用:
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="client"
- password="123456";
- };
username:为KafkaServer中的定义的用户名
password:为KafkaServer定义用户名的密码
修改配置文件kafka/config/server.properties
- #在文件中增加如下配置
- # 配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”
- listener.security.protocol.map=LOCAL:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
- # 监听端口,根据上面定义的listener名称定义对应的端口
- listeners=LOCAL://172.16.180.42:9091,INTERNAL://kafka1:19091,EXTERNAL://kafka1:29091
- # 定义对外的端口信息。
- advertised.listeners=INTERNAL://kafka1:19091,EXTERNAL://kafka1:29091
- # 定义sasl的mechanisms
- sasl.enabled.mechanisms=PLAIN
- #定义内部broker使用的通讯协议
- inter.broker.listener.name=INTERNAL
- #完成身份验证的类
- authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
-
- #当没有找到ACL配置时,允许所有的访问操作。
- allow.everyone.if.no.acl.found=true
其中最重要的就是 listeners 和 advertised.listeners :集群启动时监听listeners配置的地址,并将 advertised.listeners 配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:
根据上面配置我们要把EXTERNAL对应的端口代理出去,并且使用账户密码的方式进行验证连接。所以我们对EXTERNAL配置的时候ip尽量使用hostName的方式。因为你配置的ip不一定是客户端直接使用的ip,就是说你这里配置本机的内网ip为192.16.1.102。但是客户端使用的时候可能使用公网的ip:xxx.xx.xxx.xx。这样会造成连接失败的问题。所以我们使用hostName配置的方式。在客户端使用的时候也用hostName的方式配置。
根据上面配置得到:内部端口使用19091免密连接。外部端口使用19092使用秘钥连接。
首先把kafka_server_jaas.conf放到kafka/config下。
编辑kafka/bin/kafka-server-start.sh
- #在顶部增加
- export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/config/kafka_server_jaas.conf"
测试使用kafka自带脚本来测试。
在kafka/bin下有两个脚本:kafka-console-consumer.sh和kafka-console-producer.sh分别对应消费端和生产端。
首先把kafka_client_jaas.conf放到kafka/conf下,配置上面两个脚本增加如下:
- #在上面两个脚本上部增加如下声明
- export KAFKA_OPTS="-Djava.security.auth.login.config=kafka/conf/kafka_client_jaas.conf"
生产者启动如下:
- #使用免密的方式连接19091
- ./kafka-console-producer.sh --bootstrap-server kafka1:19091 --topic test
- #使用加密的方式连接29091
- ./kafka-console-producer.sh --bootstrap-server kafka1:29091 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
消费者启动如下:
- #使用免密的方式连接19091
- ./kafka-console-consumer.sh --bootstrap-server kafka1:19091 --topic test
- #使用加密的方式连接29091
- ./kafka-console-consumer.sh --bootstrap-server kafka1:29091 --topic test --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
参考文章:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。