赞
踩
本文记录kafka环境搭建过程,用于前端展示用数据存储以及消费。
1、首先查看系统环境内是否存在java环境,如果没有java环境,安装java环境即可,由于本人系统已安装好,所以不在展示安装过程。
2、在opt文件夹下新建目录zookeeper、kafka,可能需要root权限。
- mkdir -p /opt/zookeeper
- mkdir -p /opt/kafka
3、下载zookeeper、kafka安装包,从阿里云镜像下载,速度较快,使用如下命令。
- wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9.tar.gz
-
- wget https://mirrors.aliyun.com/apache/kafka/2.6.1/kafka_2.13-2.6.1.tgz
4、将下载好的安装压缩包复制到第二步新建的两个文件夹内,使用scp命令。并通过tar命令进行解
5、进入zookeeper/apache-zookeeper-3.5.9/conf 文件夹,并将zoo_sample.cfg复制,重命名为zoo.cfg,并进行编辑。
6、在zookeeper文件夹下新建zk1、zk2、zk3三个文件夹,并把apache-zookeeper-3.5.9-bin文件夹拷贝到三个文件夹内构建伪集群。
7、分别在三个文件夹下修改zoo.cfg文件
8、在zookeeper/zk1、2、3文件夹下分别创建data文件夹,并生成myid文件
9、进入/opt/zookeeper/zk1、2、3/apache-zookeeper-3.5.9-bin/bin文件夹内分别启动zkServer.sh
./zkServer.sh start
10、在opt/kafka文件夹下新建三个kafka 1 2 3文件夹,并把kafka解压缩包复制进去
11、分别进入/opt/kafka/kafka-1/kafka_2.13-2.6.1/config文件夹内修改server.properties文件,注意要与伪集群的id对应
-
- zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
- log.dirs=/opt/kafka/kafka-2/kafka_2.13-2.6.1/logs
- listeners=PLAINTEXT://127.0.0.1:9093
- broker.id=2
12、运行kafka
kafka-server-start.sh kafka-1/kafka_2.13-2.6.1/config/server.properties
此时报错:java.lang.UnsupportedClassVersionError: kafka/Kafka : Unsupported major.minor version 52.0
即kafka使用的java版本为1.8以上,但是此时由于是root权限,使用的java版本为1.7,因此报错。升级root java版本为1.8运行 kafka报错:
Socket error occurred: localhost/127.0.0.1:2183: Connection refused (org.apache.zookeeper.ClientCnxn)
根据网上所查内容 ,删除了/zk1、2、3/data下面的zookeeper_server.pid,之后重启zookeeper,报错
Using config: /opt/zookeeper/zk1/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
Starting zookeeper ... FAILED TO START
使用 如下命令查看启动失败原因,问题为myid文件缺失,在zk1/data中使用root权限 echo '1'>myid 构建myid文件
./zkServer.sh start-foreground
重启失败,显示Address already in use 表明zookeeper正在运行 杀掉使用该端口的进程,重启服务,zookeeper可以运行,再次启动kafka 发现可以运行
13、对kafka进行测试,使用python脚本,代码如下:
- import datetime
- import json
- import time
- import uuid
-
- from kafka import KafkaProducer
- from kafka.errors import KafkaError
-
- producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094')
- topic = 'test_2021_3_18'
-
- def insert():
- print('begin')
- try:
- n = 0
- while True:
- dic = {}
- dic['id'] = n
- n = n + 1
- dic['myuuid'] = str(uuid.uuid4().hex)
- dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
- producer.send(topic, json.dumps(dic).encode())
- print("send:" + json.dumps(dic))
- time.sleep(0.5)
- except KafkaError as e:
- print(e)
- finally:
- producer.close()
- print('done')
-
- if __name__ == '__main__':
- insert()
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
结果入下:
消费者脚本:
- from kafka import KafkaConsumer
-
- # connect to Kafka server and pass the topic we want to consume
- consumer = KafkaConsumer('test_2021_3_18',group_id = 'test_group1', bootstrap_servers='127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094')
- try:
- for msg in consumer:
- print(msg)
- # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
- except KeyboardInterrupt as e:
- print(e)
成功消费到数据:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。