赞
踩
目录
2.3 修改 zookeeper.properties 配置文件
本片是小白入门篇,所以我们不以Linux操作系统为例,选择大多数小白都用的windows。
ksfka下载链接如下,点击链接进入官网即可下载
温馨提示:JDK版本至少需要1.8,高版本也可兼容;
Apache Kafkahttps://kafka.apache.org/downloads本篇中以kafka_2.1.3-3.6.1版本为例,直接点击对应的版本下载即可,tgz包就类似于我们常见的zip,下载完成之后解压即可。
下载完毕,我们就可以解压得到 kafka 了
解压之后就可以得到 kafka 文件了
打开文件夹后可以发现内部含有bin文件夹,config配置夹,libs依赖夹等,和JDK,maven 问价夹的格式如出一辙;
后续需要用来存放日志文件,只要创建完成就可以了,kafka启动后会自动生成日志文件;
我们点击进入config文件夹,找到 zookeeper.properties 配置文件,双击进行修改,
然后,我们找到 dataDir ,将它的值修改为我们刚才创建的 data 文件的路径,还要注意一点,在后面还要多加一个 "/zk",因为一会还要配置 server.properties ,所以要用将她们两个区分开,
和刚才一样,我们双击修改 "server.properties" 配置文件
我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径,在路径末尾再添加上 "/kafka" ,用来和刚才的zk做区分,kafka 文件夹用来存放kafka的日志文件,zk 文件夹用来存放zoopeeper的日志文件;
以记事本的方式打开,然后加入下面这句话,
这句话的含义就是启动 Zookeeper ,并且启动文件为 "zookeeper.properties" ;
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
仍然以记事本的方式打开,然后加入下面这句话,
这句话的含义就是启动 kafka ,并且启动文件为 "server.properties" ;
call bin/windows/kafka-server-start.bat config/server.properties
此时,我们的 kafka 文件夹中就多了我们刚刚创建的 data 文件夹,kafka.cmd 脚本文件,zk.cmd 脚本文件;
经过第二部的配置,现在一切都已经准备就绪,我们只需要双击 zk.cmd 和 kafka.cmd 脚本文件启动kafka;
这里需要注意一点,必须先启动双击 zk.cmd 启动 zookeeper,
再双击 kafka.cmd 启动 kafka,关闭的时候,需要先关闭 kafka,再关闭 zookeeper ;
我们来到 bin 文件夹下的 windows 文件夹,打开 cmd 命令窗口,运行下方命令
- # --bootstrap-server localhost:9092 配置服务器连接,此处为本机,9092为kafka默认端口号
- # --topic test 创建topic主题,主题名称为 test
- # --create 创建topic主题命令
- kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
仍然是在 windows 文件夹下新建一个命令窗口,刚才我们已经创建出了名为 "topic" 的主题,现在运行如下命令启动脚本文件创建生产者连接上我们的 topic 主题
- # 运行 kafka-console-producer.bat 脚本创建生产者连接本机9092端口名为 test 的主题
- kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
运行如下所示,会出现一个小箭头,就说明我们链接主题成功,我们生产者发布的主题都会发送到 topic 主题中供消费者去消费使用;
- # 运行 kafka-console-consumer.bat 脚本创建消费者连接本机9092端口名为 test 的主题
- windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
运行成功不会有任何显示,
如下图所示,我在生产者命令窗口输入 "hello kafka",点击回车,我们就可以在消费者中命令窗口中看到发送过来的 "hello kafka" 消息
在 maven 项目的 pom.xml 文件中加入下方依赖,
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.6.1</version>
- </dependency>
- </dependencies>
我们随便创建一个类即可,名字随意取,代码逻辑备有注释;
- public class KafkaProducerTest {
- public static void main(String[] args) {
- // TODO 创建配置对象
- // 创建生产者对象又分为两步
- // 1. 创建配置对象集合
- Map<String, Object> configMap = new HashMap<>();
- configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
- // 2. 配置数据 Key-Value 的序列化方式
- configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
-
- // TODO 创建生产者对象
- // 将配置对象集合作为参数传入
- // 返回值就是生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configMap);
-
- // TODO 创建数据
- // 自己定义一个数据,传入三个参数,第一个参数为主题,第二个参数为数据的key,第三个参数为数据的value
- ProducerRecord<String,String> record = new ProducerRecord<>("test","first","hello kafka");
-
- // TODO 发送数据
- kafkaProducer.send(record);
-
- // TODO 关闭生产者对象
- kafkaProducer.close();
- }
- }
- public class KafkaConsumerTest {
- public static void main(String[] args) {
- // TODO 创建消费者配置对象
- // 创建消费者配置对象集合
- HashMap<String, Object> consumerConfig = new HashMap<>();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- // TODO 创建消费者对象
- KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(consumerConfig);
-
- // TODO 订阅消费主体,主题名称为 test
- consumer.subscribe(Collections.singletonList("test"));
-
- // TODO 从主题中获取数据消费
- final ConsumerRecords<String, String> datas = consumer.poll(100);
-
- for (ConsumerRecord<String,String> data : datas){
- System.out.println(data);
- }
-
- // TODO 关闭消费者对象
- consumer.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。