赞
踩
通过下面这个实例来演示Kafka与Mysql的组合使用
假设有一个学生表student,编写python程序完成如下操作
1读取student表的数据内容,将其转为JSON格式,发送给Kafka
2从Kafka中获取JSON格式数据,打印出来
---------------------------------------------------->
在使用Python操作Mysql之前,需要安装第三方模块python-kafka(在windows命令窗口)
win+r--->输入cmd然后回车
会出现一个小黑窗
输入命令pip install kafka-python安装python-kafka模块
查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)
一、先在Windows命令窗口连接上mysql
win+r---->输入cmd---->回车
会出现一个小黑窗,在小黑窗中输入mysql -u root -p然后回车输入密码
二,在school001数据库下创建student表
1.创建school001数据库: create database school001;
2.查看现有的数据库:show databases;
如果有school001表示我们创建库成功
3.使用school001数据库:use school001;
4.创建student表:create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
5. 查看数据库中的表:show tables;
6.向表中插入两条数据
第一条: insert into student values("95001","John","M",23);
第二条: insert into student values("95002","Tom","M",23);
7.查看student表中的数据:
(到这里我们的student表就创建成功了!)
三、在python中创建producer.py文件读取student表的数据内容,将其转为JSON格式,发送给Kafka
- # 运行前先在win上启动zookeap和kafka
- # 导入相关模块
- from kafka import KafkaProducer
- import json
- # 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
- producer = KafkaProducer(bootstrap_servers = 'localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
- # 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
- data = {
- 'sno':'95001',
- 'sname':'John',
- 'ssex':'M',
- 'sage':19
- }
- # 发送数据
- producer.send('test001',data)
- # 关闭资源
- producer.close()
四、在python中创建consumer.py文件从Kafka中获取JSON格式数据
- # 运行前先在win上启动mysql
- # 导入消费模块
- import json
- # 导入kafka的消费模块
- from kafka import KafkaConsumer
- import json
- import pymysql.cursors
- # 连接kafka
- consumer = KafkaConsumer('test001',bootstrap_servers = 'localhost:9092',group_id=None,auto_offset_reset='earliest')
- # 对获取的数据进行解析
- for msg in consumer:
- # 转换为字符串类型
- msg1 = str(msg.value,encoding=('utf-8'))
- # 将字符串的数据加载为字典
- dict = json.loads(msg1)
- # 连接数据库
- connect = pymysql.Connect(
- host='localhost',
- port=3306,
- user='root',
- passwd='123456',
- db='school001',
- charset='utf8'
- )
- # 获取操作数抠库的对象<游标>
- cursor = connect.cursor()
- #将数抠织存到mysqL(插入数掷)
- # 定义sql语句
- sql = "select * from student;"
- # 将数掐作为参敏传速给sqL,保存到hrgsql
- cursor.execute(sql)
- # 提交
- connect.commit()
- for row in cursor.fetchall():
- print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
- print("共查询出", cursor.rowcount, '条数据')
- connect.close()
五、运行
1.先在windows命令窗口开启 Zookeeper和Kafka
开启 Zookeeper和Kafka可以参考:(14条消息) Kafka的安装和使用(Windows中)_瑾寰的博客-CSDN博客https://blog.csdn.net/qq_68383591/article/details/130314335?spm=1001.2014.3001.55012.先运行producer.py再运行consumer.py
(在consumer.py中可以看到student表中的两条数据表示我们成功了!)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。