赞
踩
zookeeper和kafka都是java开发的,所以安装前先安装1.8版本以上的jdk,并设置环境变量 JAVA_HOME=d:\env\Java\jdk1.8.0_14
1.1 Apache ZooKeeper点击下载地址 Apache ZooKeeper,下载最新版本zookeeper压缩包,解压到本地
1.2 来到 conf文件夹下,复制一份 zoo_sample.cfg ,改名为 zoo.cfg
1.3 在安装目录下新建data 和 logs 文件夹,打开 zoo.cfg ,添加 两条配置(按自己安装的地址填写)
- dataDir=D:\\env\\apache-zookeeper-3.6.3-bin\\data
- dataLogDir=D:\\env\\apache-zookeeper-3.6.3-bin\\logs
1.4 点击 bin 目录下的zkServer.cmd,开启zookeeper服务端
注:不要关闭此窗口
1.5 点击 bin目录下 zkCli.cmd,开启客户端,出现红色框中的提示表示zookeeper已经安装成功
2.1 点击 Apache Kafka 下载最新版本的kafka
2.2 解压到本地,在安装目录下新建文件夹 kafka-logs ,进入到config目录打开 server.properties,添加两行配置 (按自己安装的地址填写)
log.dirs=D:\\env\\kafka_2.13-2.8.0\\kafka-logs
zookeeper.connect=localhost:2181
2.3 在安装目录下打开CMD,输入
.\bin\windows\kafka-server-start.bat .\config\server.properties
没有出现错误提示,说明kafka服务已经启动,注意不要关闭此窗口
3.1 创建主题
在D:\env\kafka_2.13-2.8.0\bin\windows 下 打开CMD,输入
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
--create:创建参数 --zookeeper localhost:2181:zookeeper地址 --replication-factor 1:副本数量 --partitions 1:分区数量 --topic test01 :主题名称
3.2 查看主题列表
输入
.\kafka-topics.bat --list --zookeeper localhost:2181
3.3 生产消息
输入
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test1
3.4 消费消息
输入
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning
--from-beginning :表示每次从头开始消费,不填写从当前偏移量开始消费
4.1 编译好的资源,百度云盘下载链接:https://pan.baidu.com/s/1twhwfILRo9ReCAYczd44Fw 密码:kjqh,解压到本地打开application.conf
文件
添加 kafka-manager.zkhosts="localhost:2181"
4.2 在zookeeper 和 kafka都开启的情况下 ,来到 kafka-manager安装目录下打开CMD,输入 ./bin/kafka-manager
4.3 在浏览器内开打 localhost:9000
4.4 创建实例,在zook hosts中输入地址 localhost:2181,其他选项默认
5.1.1 在zookeeper目录的conf文件夹下新建 zoo_jaas.conf 文件
- Server {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin"
- user_admin="admin";
- };
Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名
5.1.2 配置 zoo.conf 文件,加入以下配置
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- requireClientAuthScheme=sasl
- jaasLoginRenew=3600000
- zookeeper.sasl.client=true
5.1.3 在kafka安装目录下找到 zookeeper-server-start.bat,新增一个配置KAFKA_OPTS
- SetLocal
-
- set KAFKA_OPTS=-Djava.security.auth.login.config=D:/env/apache-zookeeper-3.6.3-bin/conf/zoo_jaas.conf
- ......
-
- EndLocal
5.2.1 在kafka目录下config下新建 kafka_server_jaas.conf 文件,加入以下配置
- # 定义kafka客户端与broker的认知信息
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin"
- user_admin="admin"
- user_producer="producer@123"
- user_consumer="consumer@123";
- };
- # 用于broker和zookeeper之间的认证,对应zk_server_jass.conf中的【user_admin="admin"】配置
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin";
- };
KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致
KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据
5.2.2 修改 server.properties 文件
- listeners=SASL_PLAINTEXT://localhost:9092
- #使用的认证协议
- security.inter.broker.protocol=SASL_PLAINTEXT
- #SASL机制
- sasl.enabled.mechanisms=PLAIN
- sasl.mechanism.inter.broker.protocol=PLAIN
- # 完成身份验证的类
- #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
- # 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
- allow.everyone.if.no.acl.found=false
- #超级管理员权限用户
- super.users=User:admin
- advertised.listeners=SASL_PLAINTEXT://localhost:9092
5.2.3 修改 kafka-server-start.bat文件,使之加载到 kafka_server_jaas.conf 文件
- SetLocal
-
- set KAFKA_OPTS= -Djava.security.auth.login.config=D:/env/kafka_2.13-2.8.0/config/kafka_server_jaas.conf
- ......
-
- EndLocal
5.2.4 producer的sasl配置 producer.properties新增配置
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
5.2.5 producer的sasl配置 consumer.properties新增配置
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="consumer@123";
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
5.2.6 新建 sasl.properties
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
- sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
5.2.6 启动命令【示例】 zk:
- #启动kafka服务
- .\bin\windows\kafka-server-start.bat .\config\server.properties
- #生产主题
- .\kafka-console-producer.bat --broker-list localhost:9092 --topic test1 --producer.config ..\..\config\producer.properties
- #消费主题
- .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning --consumer.config ..\..\config\consumer.properties
- #查看消费情况
- .\kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --group test-consumer-group --command-config ..\..\config\sasl.properties
5.3.1 在application.yml中加入如下配置
- kafka:
- bootstrap-servers: localhost:9092
- properties:
- sasl:
- mechanism: PLAIN
- jaas:
- config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
- security:
- protocol: SASL_PLAINTEXT
5.3.2 编写 producer和 consumer
-
- public static final String COLLECTOR_DATA_TOPIC = "test01";
- private static final String COLLECTOR_DATA_CONSUMER_DEV_GROUP = "collector_data_dev_consumer";
-
-
- @Autowired
- CollectorService collectorService;
-
- @KafkaListener(topics = COLLECTOR_DATA_TOPIC, groupId = COLLECTOR_DATA_CONSUMER_DEV_GROUP)
- public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
- collectorService.collectorConsumer(record, consumer);
- }
- package com.xxx.service.impl;
-
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONException;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
-
- import javax.annotation.Resource;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.List;
- import java.util.Set;
-
- @Service
- public class CollectorServiceImpl extends ServiceImpl<CollectorMapper, CollectorDO> implements CollectorService {
-
- public static final String CHINT_DATA_TOPIC = "test01";
-
- @Autowired
- private CollectorMapper collectorMapper;
-
- @Resource
- private KafkaTemplate<String,String> kafkaTemplate;
-
- @Override
- public ResultUtil collectorProducer(String body) {
-
- //推送数据为空返回
- if (!StringUtils.hasText(String.valueOf(body)) || String.valueOf(body).equals("[{}]") || String.valueOf(body).equals("[]") || String.valueOf(body).equals("{}")) {
- return ResultUtil.success(MessageEnum.COLLECTOR_API_NODATA);
- }
-
- //保存当前批次的推送数据
- try {
- JSONArray arrayObjects = JSON.parseArray(body);
-
- for(int i = 0; i < arrayObjects.size(); i++){
- JSONObject arrayObject = (JSONObject) arrayObjects.get(i);
-
- JSONObject jsonObject = JSON.parseObject(String.valueOf(arrayObject));
-
- //数据推向kafka生产
- kafkaTemplate.send(CHINT_DATA_TOPIC, String.valueOf(jsonObject));
-
-
- }
- // TODO: 2023/4/11 添加其他数据导入流向
-
- } catch (Exception e) {
- // TODO: 2023/4/11 存入日志
- e.printStackTrace();
- //推送数据格式错误导致数据存储失败返回
- return ResultUtil.error(MessageEnum.COLLECTOR_API_INVALID_MESSAGE);
- }
- return ResultUtil.success(MessageEnum.COLLECTOR_API_ACCEPTED);
-
- }
-
- public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer){
- // System.out.println(record.value());
-
- CollectorDO collector = new CollectorDO();
- //将系统时间戳作为接收到推送的批次
- Long tag = System.currentTimeMillis();
-
- //记录推送批次便于查询
- collectorMapper.insertCollectorTag(tag);
-
- //持久化数据
- try {
- JSONObject jsonObject = JSON.parseObject(record.value());
-
- Set<String> keys = jsonObject.keySet();
- for (String key : keys) {
- collector.setCollectorField(key);
- collector.setCollectorValue((String)jsonObject.get(key));
- collector.setTag(tag);
- collectorMapper.insert(collector);
- }
- } catch (JSONException e) {
- consumer.commitAsync();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
-
- }
-
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。