赞
踩
关于mysql安装及基本知识可以查看mysql专栏文章
https://blog.csdn.net/qq_34491508/category_11590629.html
python中可以使用 mysql-connector 来连接使用 MySQL, mysql-connector 是 MySQL 官方提供的驱动器,也可以使用PyMySQL 操作
关于mysql-connector的使用可以参考
Python MySQL – mysql-connector 驱动 | 菜鸟教程 (runoob.com)
无论通过何种方式去连接MySQL,本质上发送的 指令 都是相同的,只是连接的方式和操作形式不同而已。
这篇主要介绍开发中常用的PyMySQL 使用
PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2 中则使用 mysqldb。
PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。
在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:
pip3 install PyMySQL
使用pymysql完成对数据库的增删改查
- import pymysql
-
- # 连接MySQL(socket)
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
- cursor = conn.cursor()
-
- # 1. 查看数据库
- # 发送指令
- cursor.execute("show databases")
- # 获取指令的结果
- result = cursor.fetchall()
- print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
-
- # 2. 创建数据库(新增、删除、修改)
- # 发送指令
- cursor.execute("create database db3 default charset utf8 collate utf8_general_ci")
- conn.commit()
-
- # 3. 查看数据库
- # 发送指令
- cursor.execute("show databases")
- # 获取指令的结果
- result = cursor.fetchall()
- print(result) # (('information_schema',), ('db3',), ('mysql',), ('performance_schema',), ('sys',))
-
- # 4. 删除数据库
- # 发送指令
- cursor.execute("drop database db3")
- conn.commit()
-
- # 3. 查看数据库
- # 发送指令
- cursor.execute("show databases")
- # 获取指令的结果
- result = cursor.fetchall()
- print(result) # (('information_schema',), ('mysql',), ('performance_schema',), ('sys',))
-
- # 5. 进入数据库,查看表
- # 发送指令
- cursor.execute("use mysql")
- cursor.execute("show tables")
- result = cursor.fetchall()
- print(result) # (('columns_priv',), ('db',), ('engine_cost',), ('event',), ('func',), ('general_log',),....
-
- # 关闭连接
- cursor.close()
- conn.close()
- import pymysql
-
- # 连接MySQL
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8")
- cursor = conn.cursor()
-
- # 1. 创建数据库
- """
- cursor.execute("create database db4 default charset utf8 collate utf8_general_ci")
- conn.commit()
- """
-
- # 2. 进入数据库、查看数据表
- """
- cursor.execute("use db4")
- cursor.execute("show tables")
- result = cursor.fetchall()
- print(result)
- """
-
- # 3. 进入数据库创建表
- cursor.execute("use db4")
- sql = """
- create table L4(
- id int not null primary key auto_increment,
- title varchar(128),
- content text,
- ctime datetime
- )default charset=utf8;
- """
- cursor.execute(sql)
- conn.commit()
-
- # 4. 查看数据库中的表
- """
- cursor.execute("show tables")
- result = cursor.fetchall()
- print(result)
- """
-
- # 5. 其他 drop table... 略过
-
-
- # 关闭连接
- cursor.close()
- conn.close()
- import pymysql
-
- # 连接MySQL,自动执行 use userdb; -- 进入数据库
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
- cursor = conn.cursor()
-
-
- # 1.新增(需commit)
- """
- cursor.execute("insert into tb1(name,password) values('李四','123123')")
- conn.commit()
- """
-
- # 2.删除(需commit)
- """
- cursor.execute("delete from tb1 where id=1")
- conn.commit()
- """
-
- # 3.修改(需commit)
- """
- cursor.execute("update tb1 set name='xx' where id=1")
- conn.commit()
- """
-
- # 4.查询
- """
- cursor.execute("select * from tb where id>10")
- data = cursor.fetchone() # cursor.fetchall()
- print(data)
- """
-
- # 关闭连接
- cursor.close()
- conn.close()
- import pymysql
-
-
- def register():
- print("用户注册")
-
- user = input("请输入用户名:") # alex
- password = input("请输入密码:") # sb
-
- # 连接指定数据
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
- cursor = conn.cursor()
-
- # 执行SQL语句(有SQL注入风险,稍后讲解)
- # sql = 'insert into users(name,password)values("alex","sb")'
- sql = 'insert into users(name,password) values("{}","{}")'.format(user, password)
-
- cursor.execute(sql)
- conn.commit()
-
- # 关闭数据库连接
- cursor.close()
- conn.close()
-
- print("注册成功,用户名:{},密码:{}".format(user, password))
-
-
- def login():
- print("用户登录")
-
- user = input("请输入用户名:")
- password = input("请输入密码:")
-
- # 连接指定数据
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb")
- cursor = conn.cursor()
-
- # 执行SQL语句(有SQL注入风险,稍后讲解)
- # sql = select * from users where name='xxx' and password='123'
- sql = "select * from users where name='{}' and password='{}'".format(user, password)
- cursor.execute(sql)
-
- result = cursor.fetchone() # 去向mysql获取结果
- # None
- # (1,wupeiqi,123)
-
-
- # 关闭数据库连接
- cursor.close()
- conn.close()
-
- if result:
- print("登录成功", result)
- else:
- print("登录失败")
-
-
- def run():
- choice = input("1.注册;2.登录")
- if choice == '1':
- register()
- elif choice == '2':
- login()
- else:
- print("输入错误")
-
-
- if __name__ == '__main__':
- run()
- import pymysql
-
- # 输入用户名和密码
- user = input("请输入用户名:") # ' or 1=1 --
- pwd = input("请输入密码:") # 123
-
-
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb')
- cursor = conn.cursor()
-
- # 基于字符串格式化来 拼接SQL语句
- # sql = "select * from users where name='alex' and password='123'"
- # sql = "select * from users where name='' or 1=1 -- ' and password='123'"
- sql = "select * from users where name='{}' and password='{}'".format(user, pwd)
- cursor.execute(sql)
-
- result = cursor.fetchone()
- print(result) # None,不是None
-
- cursor.close()
- conn.close()
如果用户在输入user时,输入了: ' or 1=1 --
,这样即使用户输入的密码不存在,也会可以通过验证。
为什么呢?
因为在SQL拼接时,拼接后的结果是:
select * from users where name='' or 1=1 -- ' and password='123'
注意:在MySQL中 --
表示注释。
那么,在Python开发中 如何来避免SQL注入呢?
切记,SQL语句不要在使用python的字符串格式化,而是使用pymysql的execute方法。
- import pymysql
-
- # 输入用户名和密码
- user = input("请输入用户名:")
- pwd = input("请输入密码:")
-
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db='userdb')
-
- cursor = conn.cursor()
-
- cursor.execute("select * from users where name=%s and password=%s", [user, pwd])
- # 或
- # cursor.execute("select * from users where name=%(n1)s and password=%(n2)s", {"n1": user, 'n2': pwd})
-
- result = cursor.fetchone()
- print(result)
-
- cursor.close()
- conn.close()
上面pymysql对mysql的关键代码就是sql语句,其他都是公共的,所以可以封装成简单的数据库操作工具类
- import pymysql
-
- # 连接地址相关可以从配置文件读取
- dataSource = {
- 'host': 'localhost',
- 'port': 3306,
- 'user': 'root',
- 'password': 'root',
- 'db': 'db_sys',
- 'charset': 'utf8'
- }
-
-
- def get_conn_cursor():
- conn = pymysql.connect(host=dataSource['host'], port=dataSource['port'], user=dataSource['user'],
- password=dataSource['password'], charset=dataSource['charset'],
- db=dataSource['db'])
- cursor = conn.cursor()
- return conn, cursor
-
-
- def close_conn_cursor(*args):
- for item in args:
- item.close()
-
-
- def exec(sql, **kwargs):
- conn, cursor = get_conn_cursor()
- cursor.execute(sql, kwargs)
- conn.commit()
- close_conn_cursor(conn, cursor)
-
-
- def fetch_one(sql, **kwargs):
- conn, cursor = get_conn_cursor()
- cursor.execute(sql, kwargs)
- result = cursor.fetchone()
- close_conn_cursor(conn, cursor)
- return result
-
-
- def fetch_all(sql, **kwargs):
- conn, cursor = get_conn_cursor()
- cursor.execute(sql, kwargs)
- result = cursor.fetchall()
- close_conn_cursor(conn, cursor)
- return result
-
-
- if __name__ == '__main__':
- users = fetch_all("select * from user")
- print(users) # ((1, 'admin', 'admin', None, None), (2, 'test', 'test', None, None))
在操作数据库时需要使用数据库连接池。
- pip3 install pymysql
- pip3 install dbutils
- import threading
- import pymysql
- from dbutils.pooled_db import PooledDB
-
- MYSQL_DB_POOL = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
- mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
- maxcached=3, # 链接池中最多闲置的链接,0和None不限制
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- # ping MySQL服务端,检查是否服务可用。
- # 如:0 = None = never, 1 = default = whenever it is requested,
- # 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- host='127.0.0.1',
- port=3306,
- user='root',
- password='root123',
- database='userdb',
- charset='utf8'
- )
-
-
- def task():
- # 去连接池获取一个连接
- conn = MYSQL_DB_POOL.connection()
- cursor = conn.cursor(pymysql.cursors.DictCursor)
-
- cursor.execute('select sleep(2)')
- result = cursor.fetchall()
- print(result)
-
- cursor.close()
- # 将连接交换给连接池
- conn.close()
-
- def run():
- for i in range(10):
- t = threading.Thread(target=task)
- t.start()
-
-
- if __name__ == '__main__':
- run()
- # db.py
- import pymysql
- from dbutils.pooled_db import PooledDB
-
-
- class DBHelper(object):
-
- def __init__(self):
- # TODO 此处配置,可以去配置文件中读取。
- self.pool = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
- mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
- maxcached=3, # 链接池中最多闲置的链接,0和None不限制
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- host='127.0.0.1',
- port=3306,
- user='root',
- password='root123',
- database='userdb',
- charset='utf8'
- )
-
- def get_conn_cursor(self):
- conn = self.pool.connection()
- cursor = conn.cursor(pymysql.cursors.DictCursor)
- return conn, cursor
-
- def close_conn_cursor(self, *args):
- for item in args:
- item.close()
-
- def exec(self, sql, **kwargs):
- conn, cursor = self.get_conn_cursor()
-
- cursor.execute(sql, kwargs)
- conn.commit()
-
- self.close_conn_cursor(conn, cursor)
-
- def fetch_one(self, sql, **kwargs):
- conn, cursor = self.get_conn_cursor()
-
- cursor.execute(sql, kwargs)
- result = cursor.fetchone()
-
- self.close_conn_cursor(conn, cursor)
- return result
-
- def fetch_all(self, sql, **kwargs):
- conn, cursor = self.get_conn_cursor()
-
- cursor.execute(sql, kwargs)
- result = cursor.fetchall()
-
- self.close_conn_cursor(conn, cursor)
-
- return result
-
-
- db = DBHelper()
测试
- from db import db
-
- db.exec("insert into d1(name) values(%(name)s)", name="666")
-
- ret = db.fetch_one("select * from d1")
- print(ret)
-
- ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)
- print(ret)
-
- ret = db.fetch_all("select * from d1")
- print(ret)
-
- ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)
- print(ret)
如果想要让他也支持 with 上下文管理。
with 获取连接: 执行SQL(执行完毕后,自动将连接交还给连接池)
- # db_context.py
- import threading
- import pymysql
- from dbutils.pooled_db import PooledDB
-
- POOL = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数
- mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
- maxcached=3, # 链接池中最多闲置的链接,0和None不限制
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- host='127.0.0.1',
- port=3306,
- user='root',
- password='root123',
- database='userdb',
- charset='utf8'
- )
-
-
- class Connect(object):
- def __init__(self):
- self.conn = conn = POOL.connection()
- self.cursor = conn.cursor(pymysql.cursors.DictCursor)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.cursor.close()
- self.conn.close()
-
- def exec(self, sql, **kwargs):
- self.cursor.execute(sql, kwargs)
- self.conn.commit()
-
- def fetch_one(self, sql, **kwargs):
- self.cursor.execute(sql, kwargs)
- result = self.cursor.fetchone()
- return result
-
- def fetch_all(self, sql, **kwargs):
- self.cursor.execute(sql, kwargs)
- result = self.cursor.fetchall()
- return result
测试
- from db_context import Connect
-
- with Connect() as obj:
- # print(obj.conn)
- # print(obj.cursor)
- ret = obj.fetch_one("select * from d1")
- print(ret)
-
- ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)
- print(ret)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。