当前位置:   article > 正文

Kafka与Mysql的组合使用(Windows中)_读取student表的数据内容,将其转为json格式,发送给kafka

读取student表的数据内容,将其转为json格式,发送给kafka

通过下面这个实例来演示Kafka与Mysql的组合使用

假设有一个学生表student,编写python程序完成如下操作

1读取student表的数据内容,将其转为JSON格式,发送给Kafka

 2从Kafka中获取JSON格式数据,打印出来

---------------------------------------------------->

在使用Python操作Mysql之前,需要安装第三方模块python-kafka(在windows命令窗口)

win+r--->输入cmd然后回车

会出现一个小黑窗

输入命令pip install kafka-python安装python-kafka模块

 查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)

一、先在Windows命令窗口连接上mysql

win+r---->输入cmd---->回车

会出现一个小黑窗,在小黑窗中输入mysql -u root -p然后回车输入密码

二,在school001数据库下创建student表

1.创建school001数据库: create database school001;

 2.查看现有的数据库:show databases;

如果有school001表示我们创建库成功

 3.使用school001数据库:use school001;

 4.创建student表:create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));

5. 查看数据库中的表:show tables; 

6.向表中插入两条数据

第一条: insert into student values("95001","John","M",23);

第二条: insert into student values("95002","Tom","M",23);

 7.查看student表中的数据:

 (到这里我们的student表就创建成功了!)

三、在python中创建producer.py文件读取student表的数据内容,将其转为JSON格式,发送给Kafka

  1. # 运行前先在win上启动zookeap和kafka
  2. # 导入相关模块
  3. from kafka import KafkaProducer
  4. import json
  5. # 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
  6. producer = KafkaProducer(bootstrap_servers = 'localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
  7. # 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
  8. data = {
  9. 'sno':'95001',
  10. 'sname':'John',
  11. 'ssex':'M',
  12. 'sage':19
  13. }
  14. # 发送数据
  15. producer.send('test001',data)
  16. # 关闭资源
  17. producer.close()

四、在python中创建consumer.py文件从Kafka中获取JSON格式数据

  1. # 运行前先在win上启动mysql
  2. # 导入消费模块
  3. import json
  4. # 导入kafka的消费模块
  5. from kafka import KafkaConsumer
  6. import json
  7. import pymysql.cursors
  8. # 连接kafka
  9. consumer = KafkaConsumer('test001',bootstrap_servers = 'localhost:9092',group_id=None,auto_offset_reset='earliest')
  10. # 对获取的数据进行解析
  11. for msg in consumer:
  12. # 转换为字符串类型
  13. msg1 = str(msg.value,encoding=('utf-8'))
  14. # 将字符串的数据加载为字典
  15. dict = json.loads(msg1)
  16. # 连接数据库
  17. connect = pymysql.Connect(
  18. host='localhost',
  19. port=3306,
  20. user='root',
  21. passwd='123456',
  22. db='school001',
  23. charset='utf8'
  24. )
  25. # 获取操作数抠库的对象<游标>
  26. cursor = connect.cursor()
  27. #将数抠织存到mysqL(插入数掷)
  28. # 定义sql语句
  29. sql = "select * from student;"
  30. # 将数掐作为参敏传速给sqL,保存到hrgsql
  31. cursor.execute(sql)
  32. # 提交
  33. connect.commit()
  34. for row in cursor.fetchall():
  35. print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
  36. print("共查询出", cursor.rowcount, '条数据')
  37. connect.close()

五、运行

1.先在windows命令窗口开启 Zookeeper和Kafka

开启 Zookeeper和Kafka可以参考:(14条消息) Kafka的安装和使用(Windows中)_瑾寰的博客-CSDN博客https://blog.csdn.net/qq_68383591/article/details/130314335?spm=1001.2014.3001.55012.先运行producer.py再运行consumer.py

 (在consumer.py中可以看到student表中的两条数据表示我们成功了!)

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号