当前位置:   article > 正文

Kafuka Ubuntu16.04.1 环境搭建过程记录

Kafuka Ubuntu16.04.1 环境搭建过程记录

Kafka Ubuntu16.04.1 环境搭建过程记录


本文记录kafka环境搭建过程,用于前端展示用数据存储以及消费。

1、首先查看系统环境内是否存在java环境,如果没有java环境,安装java环境即可,由于本人系统已安装好,所以不在展示安装过程。

2、在opt文件夹下新建目录zookeeper、kafka,可能需要root权限。

  1. mkdir -p /opt/zookeeper
  2. mkdir -p /opt/kafka

3、下载zookeeper、kafka安装包,从阿里云镜像下载,速度较快,使用如下命令。

  1. wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9.tar.gz
  2. wget https://mirrors.aliyun.com/apache/kafka/2.6.1/kafka_2.13-2.6.1.tgz

4、将下载好的安装压缩包复制到第二步新建的两个文件夹内,使用scp命令。并通过tar命令进行解

5、进入zookeeper/apache-zookeeper-3.5.9/conf 文件夹,并将zoo_sample.cfg复制,重命名为zoo.cfg,并进行编辑。

6、在zookeeper文件夹下新建zk1、zk2、zk3三个文件夹,并把apache-zookeeper-3.5.9-bin文件夹拷贝到三个文件夹内构建伪集群。

7、分别在三个文件夹下修改zoo.cfg文件

8、在zookeeper/zk1、2、3文件夹下分别创建data文件夹,并生成myid文件

9、进入/opt/zookeeper/zk1、2、3/apache-zookeeper-3.5.9-bin/bin文件夹内分别启动zkServer.sh

./zkServer.sh start

10、在opt/kafka文件夹下新建三个kafka 1 2 3文件夹,并把kafka解压缩包复制进去

11、分别进入/opt/kafka/kafka-1/kafka_2.13-2.6.1/config文件夹内修改server.properties文件,注意要与伪集群的id对应

  1. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
  2. log.dirs=/opt/kafka/kafka-2/kafka_2.13-2.6.1/logs
  3. listeners=PLAINTEXT://127.0.0.1:9093
  4. broker.id=2

12、运行kafka

kafka-server-start.sh kafka-1/kafka_2.13-2.6.1/config/server.properties

此时报错:java.lang.UnsupportedClassVersionError: kafka/Kafka : Unsupported major.minor version 52.0

即kafka使用的java版本为1.8以上,但是此时由于是root权限,使用的java版本为1.7,因此报错。升级root java版本为1.8运行 kafka报错:

Socket error occurred: localhost/127.0.0.1:2183: Connection refused (org.apache.zookeeper.ClientCnxn)

根据网上所查内容 ,删除了/zk1、2、3/data下面的zookeeper_server.pid,之后重启zookeeper,报错

Using config: /opt/zookeeper/zk1/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
Starting zookeeper ... FAILED TO START

使用 如下命令查看启动失败原因,问题为myid文件缺失,在zk1/data中使用root权限 echo '1'>myid 构建myid文件

./zkServer.sh start-foreground

重启失败,显示Address already in use 表明zookeeper正在运行 杀掉使用该端口的进程,重启服务,zookeeper可以运行,再次启动kafka 发现可以运行

 

13、对kafka进行测试,使用python脚本,代码如下:

  1. import datetime
  2. import json
  3. import time
  4. import uuid
  5. from kafka import KafkaProducer
  6. from kafka.errors import KafkaError
  7. producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094')
  8. topic = 'test_2021_3_18'
  9. def insert():
  10. print('begin')
  11. try:
  12. n = 0
  13. while True:
  14. dic = {}
  15. dic['id'] = n
  16. n = n + 1
  17. dic['myuuid'] = str(uuid.uuid4().hex)
  18. dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
  19. producer.send(topic, json.dumps(dic).encode())
  20. print("send:" + json.dumps(dic))
  21. time.sleep(0.5)
  22. except KafkaError as e:
  23. print(e)
  24. finally:
  25. producer.close()
  26. print('done')
  27. if __name__ == '__main__':
  28. insert()

结果入下:

 

消费者脚本:

  1. from kafka import KafkaConsumer
  2. # connect to Kafka server and pass the topic we want to consume
  3. consumer = KafkaConsumer('test_2021_3_18',group_id = 'test_group1', bootstrap_servers='127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094')
  4. try:
  5. for msg in consumer:
  6. print(msg)
  7. # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
  8. except KeyboardInterrupt as e:
  9. print(e)

成功消费到数据:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/902721
推荐阅读
  

闽ICP备14008679号