赞
踩
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='root123', charset='utf8')
cursor = conn.cursor()
cursor.execute('create database db01 default charset utf8 collate utf8_general_ci')
conn.commit()
cursor.execute('use db01')
sql_create_table_l01= """
create table l01(
id int not null primary key auto_increment,
name varchar(32) not null
)default charset=utf8;
"""
cursor.execute(sql_create_table_l01)
conn.commit()
cursor.execute('show tables')
result = cursor.fetchall()
print(result)
cursor.execute("insert into l01(id,name) values('1','流水')")
conn.commit()
cursor.execute("delete from l01 where id=1")
conn.commit()
cursor.execute("update tb1 set name='xx' where id=1")
conn.commit()
cursor.execute("select * from tb where id>10")
data = cursor.fetchone() # cursor.fetchall()
print(data)
cursor.close()
conn.close()
import pymysql conn = pymysql.connect(host='', port=3306, user='', password='', charset=utf8) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 执行存储过程 cursor.callproc('存储过程名称') result = cursor.fetchall() # 得到执行存储过中的结果集 # 执行带参数存储过程 cursor.callproc('存储过程名称', args=('参数1','参数2', '参数3')) # 获取执行完存储的参数 cursor.execute('select @_名称_0') result = cursor.fetchall() # {@_名称_0: 参数} cursor.close() conn.close() print(result)
其他略
import pymysql def register(): print("用户注册") user = input("请输入用户名:") password = input("请输入密码:") # 连接指定数据 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb") cursor = conn.cursor() sql = 'insert into users(name,password) values("{}","{}")'.format(user, password) cursor.execute(sql) conn.commit() # 关闭数据库连接 cursor.close() conn.close() print("注册成功,用户名:{},密码:{}".format(user, password)) def login(): print("用户登录") user = input("请输入用户名:") password = input("请输入密码:") # 连接指定数据 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8", db="usersdb") cursor = conn.cursor() sql = "select * from users where name='{}' and password='{}'".format(user, password) cursor.execute(sql) result = cursor.fetchone() # 去向mysql获取结果 # 关闭数据库连接 cursor.close() conn.close() if result: print("登录成功", result) else: print("登录失败") def run(): choice = input("1.注册;2.登录") if choice == '1': register() elif choice == '2': login() else: print("输入错误") if __name__ == '__main__': run()
import pymysql # 输入用户名和密码 user = input("请输入用户名:") # ' or 1=1 -- pwd = input("请输入密码:") # 123 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root123', charset="utf8",db='usersdb') cursor = conn.cursor() # 基于字符串格式化来 拼接SQL语句 # sql = "select * from users where name='liu' and password='123'" # sql = "select * from users where name='' or 1=1 -- ' and password='123'" sql = "select * from users where name='{}' and password='{}'".format(user, pwd) cursor.execute(sql) result = cursor.fetchone() print(result) # None,不是None cursor.close() conn.close()
如果用户在输入user时,输入了: ’ or 1=1 – ,这样即使用户输入的密码不存在,也会可以通过验证。
原因:
在SQL拼接时,拼接后的结果是:
select * from users where name='' or 1=1 -- ' and password='123'
注意: 在mysql中 – 表示注释
切记, sql语句不要在使用Python的字符串格式化, 而是用pymysql的execute方法
import pymysql # 输入用户名和密码 user_name = input('请输入用户名: ') user_pwd = input('请输入密码: ') conn = pymysql.connect(host='', password=3306, user='', passwd='', charset='utf8',db='userdb') cursor = conn.cursor() cursor.execute('select * from users name=s% and password=s%',[user_name, user_pwd]) # 或者 cursor.execute('select * from users where name=%(n1)s and password=%(n2)s',{'n1':user_name, 'n2':user_pwd}) result = cursor.fetchone() print(result) cursor.close() conn.close()
在操作数控是需要使用数据库连接池
import pymysql import threading from dbutils.pooled_db import PooledDB MYSQL_DB_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, 1 = default = whenever it is requested, # 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='', port=3306, user='', password='', database='', charset='' ) def task(): # 去连接池获取一个连接 conn = MYSQL_DB_POOL() cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execut('select sleep(2)') result = cursor.fetchall() print(result) cursor.close() conn.close() def run(): for i in range(10): t = threading.Thread(target=task) t.start() if __name__ == '__main__': run()
基于数据库连接池开发一个功能sql操作类,方便以后操作数据库
import pymysql from dbutils.pooled_db import PooledDB class DBHelper(object): def __init__(self): # TODO 此处配置,可以去配置文件中读取。 self.pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='root123', database='userdb', charset='utf8' ) def get_conn_cursor(self): conn = self.pool.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) return conn, cursor def close_conn_cursor(self, *args): for item in args: item.close() def exec(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) conn.commit() self.close_conn_cursor(conn, cursor) def fetch_one(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchone() self.close_conn_cursor(conn, cursor) return result def fetch_all(self, sql, **kwargs): conn, cursor = self.get_conn_cursor() cursor.execute(sql, kwargs) result = cursor.fetchall() self.close_conn_cursor(conn, cursor) return result db = DBHelper()
from db import db
db.exec("insert into d1(name) values(%(name)s)", name="Kevin")
ret = db.fetch_one("select * from d1")
print(ret)
ret = db.fetch_one("select * from d1 where id=%(nid)s", nid=3)
print(ret)
ret = db.fetch_all("select * from d1")
print(ret)
ret = db.fetch_all("select * from d1 where id>%(nid)s", nid=2)
print(ret)
import threading import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=3, # 链接池中最多闲置的链接,0和None不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, host='127.0.0.1', port=3306, user='root', password='root123', database='userdb', charset='utf8' ) class Connect(object): def __init__(self): self.conn = conn = POOL.connection() self.cursor = conn.cursor(pymysql.cursors.DictCursor) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() def exec(self, sql, **kwargs): self.cursor.execute(sql, kwargs) self.conn.commit() def fetch_one(self, sql, **kwargs): self.cursor.execute(sql, kwargs) result = self.cursor.fetchone() return result def fetch_all(self, sql, **kwargs): self.cursor.execute(sql, kwargs) result = self.cursor.fetchall() return result
from db_context import Connect
with Connect() as obj:
# print(obj.conn)
# print(obj.cursor)
ret = obj.fetch_one("select * from d1")
print(ret)
ret = obj.fetch_one("select * from d1 where id=%(id)s", id=3)
print(ret)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。