当前位置:   article > 正文

python操作mysql_python mysql

python mysql

一、python操作mysql简介

关于mysql安装及基本知识可以查看mysql专栏文章

https://blog.csdn.net/qq_34491508/category_11590629.html

python中可以使用 mysql-connector 来连接使用 MySQL, mysql-connector 是 MySQL 官方提供的驱动器,也可以使用PyMySQL 操作

关于mysql-connector的使用可以参考

Python MySQL – mysql-connector 驱动 | 菜鸟教程 (runoob.com)

无论通过何种方式去连接MySQL,本质上发送的 指令 都是相同的,只是连接的方式和操作形式不同而已。

这篇主要介绍开发中常用的PyMySQL 使用

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2 中则使用 mysqldb。

PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。

在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:

pip3 install PyMySQL

二、pymysql管理数据库

使用pymysql完成对数据库的增删改查

  1. import pymysql
  2. # 连接MySQL(socket)
  3. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
  4. cursor = conn.cursor()
  5. # 1. 查看数据库
  6. # 发送指令
  7. cursor.execute("show databases")
  8. # 获取指令的结果
  9. result = cursor.fetchall()
  10. print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
  11. # 2. 创建数据库(新增、删除、修改)
  12. # 发送指令
  13. cursor.execute("create database db3 default charset utf8 collate utf8_general_ci")
  14. conn.commit()
  15. # 3. 查看数据库
  16. # 发送指令
  17. cursor.execute("show databases")
  18. # 获取指令的结果
  19. result = cursor.fetchall()
  20. print(result) # (('information_schema',), ('db3',), ('mysql',), ('performance_schema',), ('sys',))
  21. # 4. 删除数据库
  22. # 发送指令
  23. cursor.execute("drop database db3")
  24. conn.commit()
  25. # 3. 查看数据库
  26. # 发送指令
  27. cursor.execute("show databases")
  28. # 获取指令的结果
  29. result = cursor.fetchall()
  30. print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
  31. # 5. 进入数据库,查看表
  32. # 发送指令
  33. cursor.execute("use mysql")
  34. cursor.execute("show tables")
  35. result = cursor.fetchall()
  36. print(result) # (('columns_priv',), ('db',), ('engine_cost',), ('event',), ('func',), ('general_log',),....
  37. # 关闭连接
  38. cursor.close()
  39. conn.close()

三、pymysql管理数据表

  1. import pymysql
  2. # 连接MySQL
  3. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
  4. cursor = conn.cursor()
  5. # 1. 创建数据库
  6. """
  7. cursor.execute("create database db4 default charset utf8 collate utf8_general_ci")
  8. conn.commit()
  9. """
  10. # 2. 进入数据库、查看数据表
  11. """
  12. cursor.execute("use db4")
  13. cursor.execute("show tables")
  14. result = cursor.fetchall()
  15. print(result)
  16. """
  17. # 3. 进入数据库创建表
  18. cursor.execute("use db4")
  19. sql = """
  20. create table L4(
  21. id int not null primary key auto_increment,
  22. title varchar(128),
  23. content text,
  24. ctime datetime
  25. )default charset=utf8;
  26. """
  27. cursor.execute(sql)
  28. conn.commit()
  29. # 4. 查看数据库中的表
  30. """
  31. cursor.execute("show tables")
  32. result = cursor.fetchall()
  33. print(result)
  34. """
  35. # 5. 其他 drop table... 略过
  36. # 关闭连接
  37. cursor.close()
  38. conn.close()

四、pymysql管理数据行

1、数据的增删改查

  1. import pymysql
  2. # 连接MySQL,自动执行 use userdb; -- 进入数据库
  3. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
  4. cursor = conn.cursor()
  5. # 1.新增(需commit)
  6. """
  7. cursor.execute("insert into tb1(name,password) values('李四','123123')")
  8. conn.commit()
  9. """
  10. # 2.删除(需commit)
  11. """
  12. cursor.execute("delete from tb1 where id=1")
  13. conn.commit()
  14. """
  15. # 3.修改(需commit)
  16. """
  17. cursor.execute("update tb1 set name='xx' where id=1")
  18. conn.commit()
  19. """
  20. # 4.查询
  21. """
  22. cursor.execute("select * from tb where id>10")
  23. data = cursor.fetchone() # cursor.fetchall()
  24. print(data)
  25. """
  26. # 关闭连接
  27. cursor.close()
  28. conn.close()

2、案例:实现 注册、登录功能

  1. import pymysql
  2. def register():
  3. print("用户注册")
  4. user = input("请输入用户名:") # alex
  5. password = input("请输入密码:") # sb
  6. # 连接指定数据
  7. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
  8. cursor = conn.cursor()
  9. # 执行SQL语句(有SQL注入风险,稍后讲解)
  10. # sql = 'insert into users(name,password)values("alex","sb")'
  11. sql = 'insert into users(name,password) values("{}","{}")'.format(user, password)
  12. cursor.execute(sql)
  13. conn.commit()
  14. # 关闭数据库连接
  15. cursor.close()
  16. conn.close()
  17. print("注册成功,用户名:{},密码:{}".format(user, password))
  18. def login():
  19. print("用户登录")
  20. user = input("请输入用户名:")
  21. password = input("请输入密码:")
  22. # 连接指定数据
  23. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
  24. cursor = conn.cursor()
  25. # 执行SQL语句(有SQL注入风险,稍后讲解)
  26. # sql = select * from users where name='xxx' and password='123'
  27. sql = "select * from users where name='{}' and password='{}'".format(user, password)
  28. cursor.execute(sql)
  29. result = cursor.fetchone() # 去向mysql获取结果
  30. # None
  31. # (1,wupeiqi,123)
  32. # 关闭数据库连接
  33. cursor.close()
  34. conn.close()
  35. if result:
  36. print("登录成功", result)
  37. else:
  38. print("登录失败")
  39. def run():
  40. choice = input("1.注册;2.登录")
  41. if choice == '1':
  42. register()
  43. elif choice == '2':
  44. login()
  45. else:
  46. print("输入错误")
  47. if __name__ == '__main__':
  48. run()

3、关于SQL注入

  1. import pymysql
  2. # 输入用户名和密码
  3. user = input("请输入用户名:") # ' or 1=1 --
  4. pwd = input("请输入密码:") # 123
  5. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb')
  6. cursor = conn.cursor()
  7. # 基于字符串格式化来 拼接SQL语句
  8. # sql = "select * from users where name='alex' and password='123'"
  9. # sql = "select * from users where name='' or 1=1 -- ' and password='123'"
  10. sql = "select * from users where name='{}' and password='{}'".format(user, pwd)
  11. cursor.execute(sql)
  12. result = cursor.fetchone()
  13. print(result) # None,不是None
  14. cursor.close()
  15. conn.close()

如果用户在输入user时,输入了: ' or 1=1 -- ,这样即使用户输入的密码不存在,也会可以通过验证。

为什么呢?

因为在SQL拼接时,拼接后的结果是:

select * from users where name='' or 1=1 -- ' and password='123'

注意:在MySQL中 -- 表示注释。

那么,在Python开发中 如何来避免SQL注入呢?

切记,SQL语句不要在使用python的字符串格式化,而是使用pymysql的execute方法。

  1. import pymysql
  2. # 输入用户名和密码
  3. user = input("请输入用户名:")
  4. pwd = input("请输入密码:")
  5. conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
  6. cursor = conn.cursor()
  7. cursor.execute("select * from users where name=%s and password=%s", [user, pwd])
  8. # 或
  9. # cursor.execute("select * from users where name=%(n1)s and password=%(n2)s", {"n1": user, 'n2': pwd})
  10. result = cursor.fetchone()
  11. print(result)
  12. cursor.close()
  13. conn.close()

五、工具类

上面pymysql对mysql的关键代码就是sql语句,其他都是公共的,所以可以封装成简单的数据库操作工具类

  1. import pymysql
  2. # 连接地址相关可以从配置文件读取
  3. dataSource = {
  4. 'host': 'localhost',
  5. 'port': 3306,
  6. 'user': 'root',
  7. 'password': 'root',
  8. 'db': 'db_sys',
  9. 'charset': 'utf8'
  10. }
  11. def get_conn_cursor():
  12. conn = pymysql.connect(host=dataSource['host'], port=dataSource['port'], user=dataSource['user'],
  13. password=dataSource['password'], charset=dataSource['charset'],
  14. db=dataSource['db'])
  15. cursor = conn.cursor()
  16. return conn, cursor
  17. def close_conn_cursor(*args):
  18. for item in args:
  19. item.close()
  20. def exec(sql, **kwargs):
  21. conn, cursor = get_conn_cursor()
  22. cursor.execute(sql, kwargs)
  23. conn.commit()
  24. close_conn_cursor(conn, cursor)
  25. def fetch_one(sql, **kwargs):
  26. conn, cursor = get_conn_cursor()
  27. cursor.execute(sql, kwargs)
  28. result = cursor.fetchone()
  29. close_conn_cursor(conn, cursor)
  30. return result
  31. def fetch_all(sql, **kwargs):
  32. conn, cursor = get_conn_cursor()
  33. cursor.execute(sql, kwargs)
  34. result = cursor.fetchall()
  35. close_conn_cursor(conn, cursor)
  36. return result
  37. if __name__ == '__main__':
  38. users = fetch_all("select * from user")
  39. print(users) # ((1, 'admin', 'admin', None, None), (2, 'test', 'test', None, None))

六、数据库连接池

在操作数据库时需要使用数据库连接池。

  1. pip3 install pymysql
  2. pip3 install dbutils
  1. import threading
  2. import pymysql
  3. from dbutils.pooled_db import PooledDB
  4. MYSQL_DB_POOL = PooledDB(
  5. creator=pymysql, # 使用链接数据库的模块
  6. maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
  7. mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
  8. maxcached=3, # 链接池中最多闲置的链接,0和None不限制
  9. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  10. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  11. ping=0,
  12. # ping MySQL服务端,检查是否服务可用。
  13. # 如:0 = None = never, 1 = default = whenever it is requested,
  14. # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
  15. host='127.0.0.1',
  16. port=3306,
  17. user='root',
  18. password='root123',
  19. database='userdb',
  20. charset='utf8'
  21. )
  22. def task():
  23. # 去连接池获取一个连接
  24. conn = MYSQL_DB_POOL.connection()
  25. cursor = conn.cursor(pymysql.cursors.DictCursor)
  26. cursor.execute('select sleep(2)')
  27. result = cursor.fetchall()
  28. print(result)
  29. cursor.close()
  30. # 将连接交换给连接池
  31. conn.close()
  32. def run():
  33. for i in range(10):
  34. t = threading.Thread(target=task)
  35. t.start()
  36. if __name__ == '__main__':
  37. run()

七、基于数据库连接池工具类

1、第一种:单例和方法工具类

  1. # db.py
  2. import pymysql
  3. from dbutils.pooled_db import PooledDB
  4. class DBHelper(object):
  5. def __init__(self):
  6. # TODO 此处配置,可以去配置文件中读取。
  7. self.pool = PooledDB(
  8. creator=pymysql, # 使用链接数据库的模块
  9. maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
  10. mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
  11. maxcached=3, # 链接池中最多闲置的链接,0和None不限制
  12. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  13. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  14. ping=0,
  15. # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
  16. host='127.0.0.1',
  17. port=3306,
  18. user='root',
  19. password='root123',
  20. database='userdb',
  21. charset='utf8'
  22. )
  23. def get_conn_cursor(self):
  24. conn = self.pool.connection()
  25. cursor = conn.cursor(pymysql.cursors.DictCursor)
  26. return conn, cursor
  27. def close_conn_cursor(self, *args):
  28. for item in args:
  29. item.close()
  30. def exec(self, sql, **kwargs):
  31. conn, cursor = self.get_conn_cursor()
  32. cursor.execute(sql, kwargs)
  33. conn.commit()
  34. self.close_conn_cursor(conn, cursor)
  35. def fetch_one(self, sql, **kwargs):
  36. conn, cursor = self.get_conn_cursor()
  37. cursor.execute(sql, kwargs)
  38. result = cursor.fetchone()
  39. self.close_conn_cursor(conn, cursor)
  40. return result
  41. def fetch_all(self, sql, **kwargs):
  42. conn, cursor = self.get_conn_cursor()
  43. cursor.execute(sql, kwargs)
  44. result = cursor.fetchall()
  45. self.close_conn_cursor(conn, cursor)
  46. return result
  47. db = DBHelper()

测试

  1. from db import db
  2. db.exec("insert into d1(name) values(%(name)s)", name="666")
  3. ret = db.fetch_one("select * from d1")
  4. print(ret)
  5. ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)
  6. print(ret)
  7. ret = db.fetch_all("select * from d1")
  8. print(ret)
  9. ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)
  10. print(ret)

 

2、上下文管理方式工具类(推荐)

如果想要让他也支持 with 上下文管理。

with 获取连接:
	执行SQL(执行完毕后,自动将连接交还给连接池)
  1. # db_context.py
  2. import threading
  3. import pymysql
  4. from dbutils.pooled_db import PooledDB
  5. POOL = PooledDB(
  6. creator=pymysql, # 使用链接数据库的模块
  7. maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
  8. mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
  9. maxcached=3, # 链接池中最多闲置的链接,0和None不限制
  10. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  11. setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  12. ping=0,
  13. host='127.0.0.1',
  14. port=3306,
  15. user='root',
  16. password='root123',
  17. database='userdb',
  18. charset='utf8'
  19. )
  20. class Connect(object):
  21. def __init__(self):
  22. self.conn = conn = POOL.connection()
  23. self.cursor = conn.cursor(pymysql.cursors.DictCursor)
  24. def __enter__(self):
  25. return self
  26. def __exit__(self, exc_type, exc_val, exc_tb):
  27. self.cursor.close()
  28. self.conn.close()
  29. def exec(self, sql, **kwargs):
  30. self.cursor.execute(sql, kwargs)
  31. self.conn.commit()
  32. def fetch_one(self, sql, **kwargs):
  33. self.cursor.execute(sql, kwargs)
  34. result = self.cursor.fetchone()
  35. return result
  36. def fetch_all(self, sql, **kwargs):
  37. self.cursor.execute(sql, kwargs)
  38. result = self.cursor.fetchall()
  39. return result

测试

  1. from db_context import Connect
  2. with Connect() as obj:
  3. # print(obj.conn)
  4. # print(obj.cursor)
  5. ret = obj.fetch_one("select * from d1")
  6. print(ret)
  7. ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)
  8. print(ret)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/980475
推荐阅读
相关标签
  

闽ICP备14008679号