当前位置:   article > 正文

Windows下安装单机Kafka环境及配置SASL身份认证_window 配置 kafka jaas

window 配置 kafka jaas

0 安装JDK

zookeeper和kafka都是java开发的,所以安装前先安装1.8版本以上的jdk,并设置环境变量 JAVA_HOME=d:\env\Java\jdk1.8.0_14

1 安装Zookeeper 

1.1 Apache ZooKeeper点击下载地址 Apache ZooKeeper,下载最新版本zookeeper压缩包,解压到本地

1.2 来到 conf文件夹下,复制一份 zoo_sample.cfg ,改名为 zoo.cfg

1.3 在安装目录下新建data 和 logs 文件夹,打开 zoo.cfg ,添加 两条配置(按自己安装的地址填写)

  1. dataDir=D:\\env\\apache-zookeeper-3.6.3-bin\\data
  2. dataLogDir=D:\\env\\apache-zookeeper-3.6.3-bin\\logs

1.4 点击 bin 目录下的zkServer.cmd,开启zookeeper服务端

注:不要关闭此窗口

1.5 点击 bin目录下 zkCli.cmd,开启客户端,出现红色框中的提示表示zookeeper已经安装成功

2 安装Kafka

 2.1 点击 Apache Kafka 下载最新版本的kafka

 2.2 解压到本地,在安装目录下新建文件夹 kafka-logs ,进入到config目录打开 server.properties,添加两行配置 (按自己安装的地址填写)

log.dirs=D:\\env\\kafka_2.13-2.8.0\\kafka-logs

zookeeper.connect=localhost:2181

 2.3 在安装目录下打开CMD,输入

.\bin\windows\kafka-server-start.bat .\config\server.properties

 没有出现错误提示,说明kafka服务已经启动,注意不要关闭此窗口

3 测试kafka

3.1 创建主题 

在D:\env\kafka_2.13-2.8.0\bin\windows 下 打开CMD,输入 

.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

--create:创建参数     --zookeeper localhost:2181:zookeeper地址    --replication-factor 1:副本数量    --partitions 1:分区数量    --topic test01 :主题名称

3.2 查看主题列表

输入

.\kafka-topics.bat --list --zookeeper localhost:2181

 

 3.3 生产消息

输入 

.\kafka-console-producer.bat --broker-list localhost:9092 --topic test1

3.4 消费消息 

输入 

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

 

--from-beginning :表示每次从头开始消费,不填写从当前偏移量开始消费

4 安装kafka可视化管理工具 kafka-manager

4.1 编译好的资源,百度云盘下载链接:https://pan.baidu.com/s/1twhwfILRo9ReCAYczd44Fw 密码:kjqh,解压到本地打开application.conf文件

添加 kafka-manager.zkhosts="localhost:2181"

4.2 在zookeeper 和 kafka都开启的情况下 ,来到 kafka-manager安装目录下打开CMD,输入 ./bin/kafka-manager

4.3 在浏览器内开打  localhost:9000

4.4 创建实例,在zook hosts中输入地址 localhost:2181,其他选项默认

 5 配置kafka身份认证 SASL

5.1 ZooKeeper 配置 SASL

5.1.1 在zookeeper目录的conf文件夹下新建 zoo_jaas.conf 文件

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="admin"
  5. user_admin="admin";
  6. };

Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名

5.1.2 配置 zoo.conf 文件,加入以下配置

  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=3600000
  4. zookeeper.sasl.client=true

5.1.3 在kafka安装目录下找到 zookeeper-server-start.bat,新增一个配置KAFKA_OPTS

  1. SetLocal
  2. set KAFKA_OPTS=-Djava.security.auth.login.config=D:/env/apache-zookeeper-3.6.3-bin/conf/zoo_jaas.conf
  3. ......
  4. EndLocal

 5.2 kafka 配置 SASL

5.2.1 在kafka目录下config下新建  kafka_server_jaas.conf 文件,加入以下配置

  1. # 定义kafka客户端与broker的认知信息
  2. KafkaServer {
  3. org.apache.kafka.common.security.plain.PlainLoginModule required
  4. username="admin"
  5. password="admin"
  6. user_admin="admin"
  7. user_producer="producer@123"
  8. user_consumer="consumer@123";
  9. };
  10. # 用于broker和zookeeper之间的认证,对应zk_server_jass.conf中的【user_admin="admin"】配置
  11. KafkaClient {
  12. org.apache.kafka.common.security.plain.PlainLoginModule required
  13. username="admin"
  14. password="admin";
  15. };

KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致
KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据

5.2.2 修改 server.properties 文件

  1. listeners=SASL_PLAINTEXT://localhost:9092
  2. #使用的认证协议
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. #SASL机制
  5. sasl.enabled.mechanisms=PLAIN
  6. sasl.mechanism.inter.broker.protocol=PLAIN
  7. # 完成身份验证的类
  8. #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  9. # 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
  10. allow.everyone.if.no.acl.found=false
  11. #超级管理员权限用户
  12. super.users=User:admin
  13. advertised.listeners=SASL_PLAINTEXT://localhost:9092

5.2.3 修改 kafka-server-start.bat文件,使之加载到 kafka_server_jaas.conf 文件

  1. SetLocal
  2. set KAFKA_OPTS= -Djava.security.auth.login.config=D:/env/kafka_2.13-2.8.0/config/kafka_server_jaas.conf
  3. ......
  4. EndLocal

5.2.4 producer的sasl配置 producer.properties新增配置

  1. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";
  2. security.protocol=SASL_PLAINTEXT
  3. sasl.mechanism=PLAIN

5.2.5 producer的sasl配置 consumer.properties新增配置

  1. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="consumer@123";
  2. security.protocol=SASL_PLAINTEXT
  3. sasl.mechanism=PLAIN

5.2.6 新建 sasl.properties

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

5.2.6 启动命令【示例】 zk:

  1. #启动kafka服务
  2. .\bin\windows\kafka-server-start.bat .\config\server.properties
  1. #生产主题
  2. .\kafka-console-producer.bat --broker-list localhost:9092 --topic test1 --producer.config ..\..\config\producer.properties
  1. #消费主题
  2. .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning --consumer.config ..\..\config\consumer.properties
  1. #查看消费情况
  2. .\kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --group test-consumer-group --command-config ..\..\config\sasl.properties

5.3 java api 验证

5.3.1 在application.yml中加入如下配置

  1. kafka:
  2. bootstrap-servers: localhost:9092
  3. properties:
  4. sasl:
  5. mechanism: PLAIN
  6. jaas:
  7. config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
  8. security:
  9. protocol: SASL_PLAINTEXT

5.3.2 编写 producer和 consumer 

  1. public static final String COLLECTOR_DATA_TOPIC = "test01";
  2. private static final String COLLECTOR_DATA_CONSUMER_DEV_GROUP = "collector_data_dev_consumer";
  3. @Autowired
  4. CollectorService collectorService;
  5. @KafkaListener(topics = COLLECTOR_DATA_TOPIC, groupId = COLLECTOR_DATA_CONSUMER_DEV_GROUP)
  6. public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
  7. collectorService.collectorConsumer(record, consumer);
  8. }

  1. package com.xxx.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONException;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  7. import org.apache.kafka.clients.consumer.Consumer;
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.elasticsearch.action.index.IndexRequest;
  10. import org.elasticsearch.common.xcontent.XContentType;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.kafka.core.KafkaTemplate;
  13. import org.springframework.stereotype.Service;
  14. import org.springframework.util.StringUtils;
  15. import javax.annotation.Resource;
  16. import java.time.LocalDateTime;
  17. import java.time.format.DateTimeFormatter;
  18. import java.util.List;
  19. import java.util.Set;
  20. @Service
  21. public class CollectorServiceImpl extends ServiceImpl<CollectorMapper, CollectorDO> implements CollectorService {
  22. public static final String CHINT_DATA_TOPIC = "test01";
  23. @Autowired
  24. private CollectorMapper collectorMapper;
  25. @Resource
  26. private KafkaTemplate<String,String> kafkaTemplate;
  27. @Override
  28. public ResultUtil collectorProducer(String body) {
  29. //推送数据为空返回
  30. if (!StringUtils.hasText(String.valueOf(body)) || String.valueOf(body).equals("[{}]") || String.valueOf(body).equals("[]") || String.valueOf(body).equals("{}")) {
  31. return ResultUtil.success(MessageEnum.COLLECTOR_API_NODATA);
  32. }
  33. //保存当前批次的推送数据
  34. try {
  35. JSONArray arrayObjects = JSON.parseArray(body);
  36. for(int i = 0; i < arrayObjects.size(); i++){
  37. JSONObject arrayObject = (JSONObject) arrayObjects.get(i);
  38. JSONObject jsonObject = JSON.parseObject(String.valueOf(arrayObject));
  39. //数据推向kafka生产
  40. kafkaTemplate.send(CHINT_DATA_TOPIC, String.valueOf(jsonObject));
  41. }
  42. // TODO: 2023/4/11 添加其他数据导入流向
  43. } catch (Exception e) {
  44. // TODO: 2023/4/11 存入日志
  45. e.printStackTrace();
  46. //推送数据格式错误导致数据存储失败返回
  47. return ResultUtil.error(MessageEnum.COLLECTOR_API_INVALID_MESSAGE);
  48. }
  49. return ResultUtil.success(MessageEnum.COLLECTOR_API_ACCEPTED);
  50. }
  51. public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer){
  52. // System.out.println(record.value());
  53. CollectorDO collector = new CollectorDO();
  54. //将系统时间戳作为接收到推送的批次
  55. Long tag = System.currentTimeMillis();
  56. //记录推送批次便于查询
  57. collectorMapper.insertCollectorTag(tag);
  58. //持久化数据
  59. try {
  60. JSONObject jsonObject = JSON.parseObject(record.value());
  61. Set<String> keys = jsonObject.keySet();
  62. for (String key : keys) {
  63. collector.setCollectorField(key);
  64. collector.setCollectorValue((String)jsonObject.get(key));
  65. collector.setTag(tag);
  66. collectorMapper.insert(collector);
  67. }
  68. } catch (JSONException e) {
  69. consumer.commitAsync();
  70. } catch (Exception e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/701459
推荐阅读
相关标签
  

闽ICP备14008679号