赞
踩
Python 连接 MySQL 数据库,可以使用 mysql-connector 或 PyMySQL ,任选其一即可。
pymysql 是线程安全的 (搜索 thread,可以看到 thread_safe=1,同时函数 thread_safe() 返回 True ):https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py
连接数据库前,请先确认以下事项:
链接 Mysql 的 TESTDB 数据库:
- import pymysql
-
-
- def main():
- # 打开数据库连接
- mysql_conn = pymysql.connect("localhost", "testuser", "test123", "TESTDB")
-
- try:
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = mysql_conn.cursor()
- sql_string = 'insert into test_table (col1, c0l2) values (1,2,3)'
- cursor.execute(sql_string)
- mysql_conn.commit()
-
- # 使用 execute() 方法执行 SQL 查询
- cursor.execute("SELECT VERSION()")
-
- # 使用 fetchone() 方法获取单条数据.
- data = cursor.fetchone()
-
- print("Database version : %s " % data)
- except BaseException as be:
- # 关闭数据库连接
- mysql_conn.rollback()
- finally:
- mysql_conn.close()
-
-
- if __name__ == '__main__':
- main()
- pass

游标对象:就是对数据库进行具体的操作,比如 "增、删、改、查" 等一系列操作
游标类型 如下:
类型 | 描述 |
Cursor | 普通的游标对象,默认创建的游标对象 |
SSCursor | 不缓存游标,主要用于当操作需要返回大量数据的时候 |
DictCursor | 以字典的形式返回操作结果 |
SSDictCursor | 不缓存游标,将结果以字典的形式进行返回 |
Python3 MySQL:https://www.runoob.com/python3/python3-mysql.html
如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = db.cursor()
-
- # 使用 execute() 方法执行 SQL,如果表存在则删除
- cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
-
- # 使用预处理语句创建表
- sql = """CREATE TABLE EMPLOYEE (
- FIRST_NAME CHAR(20) NOT NULL,
- LAST_NAME CHAR(20),
- AGE INT,
- SEX CHAR(1),
- INCOME FLOAT )"""
-
- cursor.execute(sql)
-
- # 关闭数据库连接
- db.close()

Python 查询 Mysql 使用 fetchone() 方法获取单条数据,使用 fetchall() 方法获取多条数据。
查询 EMPLOYEE 表中 salary(工资)字段大于 1000 的所有数据:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 查询语句
- sql = "SELECT * FROM EMPLOYEE \
- WHERE INCOME > %s" % (1000)
- try:
- # 执行SQL语句
- cursor.execute(sql)
- # 获取所有记录列表
- results = cursor.fetchall()
- for row in results:
- fname = row[0]
- lname = row[1]
- age = row[2]
- sex = row[3]
- income = row[4]
- # 打印结果
- print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
- (fname, lname, age, sex, income ))
- except:
- print ("Error: unable to fetch data")
-
- # 关闭数据库连接
- db.close()

示例:
- import pymysql
-
- class DB():
- def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):
- # 建立连接
- self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)
- # 创建游标,操作设置为字典类型
- self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)
-
- def __enter__(self):
- # 返回游标
- return self.cur
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- # 提交数据库并执行
- self.conn.commit()
- # 关闭游标
- self.cur.close()
- # 关闭数据库连接
- self.conn.close()
-
-
- if __name__ == '__main__':
- with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db:
- db.execute('select * from course')
- print(db)
- for i in db:
- print(i)

执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 插入语句
- sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
- LAST_NAME, AGE, SEX, INCOME)
- VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
- try:
- # 执行sql语句
- cursor.execute(sql)
- # 提交到数据库执行
- db.commit()
- except:
- # 如果发生错误则回滚
- db.rollback()
-
- # 关闭数据库连接
- db.close()

以上例子也可以写成如下形式:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 插入语句
- sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
- LAST_NAME, AGE, SEX, INCOME) \
- VALUES ('%s', '%s', %s, '%s', %s)" % \
- ('Mac', 'Mohan', 20, 'M', 2000)
- try:
- # 执行sql语句
- cursor.execute(sql)
- # 执行sql语句
- db.commit()
- except:
- # 发生错误时回滚
- db.rollback()
-
- # 关闭数据库连接
- db.close()

以下代码使用变量向SQL语句中传递参数:
- ..................................
- user_id = "test123"
- password = "password"
-
- con.execute('insert into Login values( %s, %s)' % \
- (user_id, password))
- ..................................
单条插入数据:
-
- #!/usr/bin/python3
-
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 插入语句 里面的数据类型要对应
- sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
- LAST_NAME, AGE, SEX, INCOME) \
- VALUES ('%s', '%s', %s, '%s', %s)" % \
- ('Mac', 'Mohan', 20, 'M', 2000)
- try:
- # 执行sql语句
- cursor.execute(sql)
- # 执行sql语句
- db.commit()
- except:
- # 发生错误时回滚
- db.rollback()
-
- # 关闭数据库连接
- db.close()

批量插入数据:
注意:批量插入数据 与 单条插入数据 的区别:
- #!/usr/bin/env python
- # -*-encoding:utf-8-*-
-
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","root","123","testdb")
-
- # 使用 cursor() 方法创建一个游标对象 cursor
- cursor = db.cursor()
-
- # SQL 插入语句
- sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
- LAST_NAME, AGE, SEX, INCOME) \
- VALUES (%s,%s,%s,%s,%s)"
- # 区别与单条插入数据,VALUES ('%s', '%s', %s, '%s', %s) 里面不用引号
-
- val = (('li', 'si', 16, 'F', 1000),
- ('Bruse', 'Jerry', 30, 'F', 3000),
- ('Lee', 'Tomcat', 40, 'M', 4000),
- ('zhang', 'san', 18, 'M', 1500))
- try:
- # 执行sql语句
- cursor.executemany(sql,val)
- # 提交到数据库执行
- db.commit()
- except:
- # 如果发生错误则回滚
- db.rollback()
-
- # 关闭数据库连接
- db.close()

:https://blog.csdn.net/zhou16333/article/details/95867827
mysql 语法支持如果数据存在则更新,不存在则插入,首先判断数据存在还是不存在的那个字段要设置成unique索引:
在 MySQL 中,当你需要处理主键或唯一索引约束冲突的数据插入时,有几种方法可以选择:ON DUPLICATE KEY UPDATE
、REPLACE INTO
和简单的 REPLACE
语句。这些方法各有利弊,适用于不同的场景。
on duplicate key update
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...)
ON DUPLICATE KEY UPDATE col1 = new_val1, col2 = new_val2, ...;
replace into 和 replace
REPLACE INTO table_name (col1, col2, ...) VALUES (val1, val2, ...);
REPLACE table_name SET col1 = val1, col2 = val2, ...;
REPLACE
就像普通的 INSERT
操作一样插入新数据。REPLACE
会先删除旧的冲突记录,然后插入新记录。REPLACE
时,可能会导致旧记录的完全删除,包括那些不冲突的字段也会被新记录替换。选择哪种方法取决于你的具体需求:
ON DUPLICATE KEY UPDATE
。REPLACE
或 REPLACE INTO
是更直接的选择。但请留意,这种方式可能会影响到与被删除记录相关的外键约束。更新操作用于更新数据表的数据,以下实例将 TESTDB 表中 SEX 为 'M' 的 AGE 字段递增 1:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 更新语句
- sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
- try:
- # 执行SQL语句
- cursor.execute(sql)
- # 提交到数据库执行
- db.commit()
- except:
- # 发生错误时回滚
- db.rollback()
-
- # 关闭数据库连接
- db.close()

使用 pymysql 的 course.executemany(sql, update_list) 进行批量更新
示 例:
- db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')
-
- name_list = ["re", "gh", "ds", "D"] # 存储name的值
- age_list = ["10", "20", "30", "40"] # 存储age的值
- id_list = ["1", "2", "3", "4"] # 存储id的值
- val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))]
- print(val_list)
- # [['re', '10', '1'], ['gh', '20', '2'], ['ds', '30', '3'], ['D', '40', '4']]
-
- with db.cursor() as cursor:
- try:
- sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)"
- cursor.executemany(sql, val_list)
- db.commit()
- except:
- db.rollback()
- db.close()

注意:插入数字也是 %s
- # coding=utf-8
-
- import time
- import pymysql.cursors
-
- conn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com',
- port=3306,
- user='dba',
- password='xxxxx',
- db='app',
- charset='utf8')
- cursor= conn.cursor()
- # conn.ping(reconnect=True)
-
- count= 0
- posts=[]
- for postin posts:
- try:
- sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s'
- ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))
- conn.commit()
- except Exception as e:
- print("batch Exception:", e)
-
- count+=1
-
- cursor.close()
- conn.close()
-
-
- # 基本sql语句写法
- # INSERT INTO star(name,gender) VALUES(“XX”, 20)
- # SELECT * FROM app.user_post WHERE post_id LIKE '%xxxx%';
- # UPDATE app.user_post SET post_id=replace(post_id,'\'','’);
- # UPDATE app.user_post SET province = ‘xxx', city =‘xxx';
- # DELETE FROM app.user_post where updated_at = '0000-00-00 00:00:00’;
-
-
- # 带参数构造语句的基本写法
- # sql = 'select user_id, post_id from user_post where user_id="{user_id}" and post_id="{post_id}"'.format(user_id=user_id, post_id=post_id)
- # sql = 'SELECT count(*) FROM user_like where like_post_id = "%s"' % ("xxx")
- # sql = 'update star set gender="{gender}", height="{height}" where star_id="{star_id}"'.format(gender='M', height=180, star_id=123456789)

删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:
- import pymysql
-
- # 打开数据库连接
- db = pymysql.connect("localhost","testuser","test123","TESTDB" )
-
- # 使用cursor()方法获取操作游标
- cursor = db.cursor()
-
- # SQL 删除语句
- sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
- try:
- # 执行SQL语句
- cursor.execute(sql)
- # 提交修改
- db.commit()
- except:
- # 发生错误时回滚
- db.rollback()
-
- # 关闭连接
- db.close()

事务机制可以确保数据一致性。
对于支持事务的数据库, 在 Python 数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。commit()方法游标的所有更新操作,rollback()方法回滚当前游标的所有操作。每一个方法都开始了一个新的事务。
事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。
示例:
- # SQL删除记录语句
- sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
- try:
- # 执行SQL语句
- cursor.execute(sql)
- # 向数据库提交
- db.commit()
- except:
- # 发生错误时回滚
- db.rollback()
"本地线程" 可以实现线程之间的数据隔离。保证每个线程都只有自己的一份数据,在操作时不会影响别人的,即使是多线程,自己的值也是互相隔离的
- import threading
- import time
-
- # 本地线程对象
- local_values = threading.local()
-
-
- def func(num):
- """
- # 第一个线程进来,本地线程对象会为他创建一个
- # 第二个线程进来,本地线程对象会为他创建一个
- {
- 线程1的唯一标识:{name:1},
- 线程2的唯一标识:{name:2},
- }
- :param num:
- :return:
- """
- local_values.name = num # 4
- # 线程停下来了
- time.sleep(2)
- # 第二个线程: local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值
- print(local_values.name, threading.current_thread().name)
-
-
- for i in range(5):
- th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
- th.start()
'运行
创建一个连接池,为所有线程提供连接,线程使用连接时获取连接,使用完毕放回连接池。
线程不断地重用连接池里的连接。
- import sys
- import threading
- import pymysql
- from dbutils.persistent_db import PersistentDB
-
-
- POOL = PersistentDB(
- creator=pymysql, # 使用链接数据库的模块
- maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
- setsession=[], # 开始会话前执行的命令列表。
- ping=0,
- closeable=False,
- threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123',
- database='pooldb',
- charset='utf8'
- )
-
-
- def func():
- # conn = SteadyDBConnection()
- conn = POOL.connection()
- cursor = conn.cursor()
- cursor.execute('select * from tb1')
- result = cursor.fetchall()
- cursor.close()
- conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()
-
- conn = POOL.connection()
- cursor = conn.cursor()
- cursor.execute('select * from tb1')
- result = cursor.fetchall()
- cursor.close()
- conn.close()
-
-
- for i in range(10):
- t = threading.Thread(target=func)
- t.start()

安装:pip install mysql-connector-python
连接MySQL
- import mysql.connector
-
- # 接收参数:user, password, host, port=3306, unix_socket and database
- # 返回一个MySQLConnection Object
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
执行SQL命令
执行sql命令之前,需要先创建一个查询,调用一下cursor()方法,这个方法名光标的意思,可以理解为,命令行中,执行sql语句之前,先要有一个光标行,在光标行中进行操作。
调用cursor()返回的结果就是光标行(cmd,简称cmd),然后调用cmd的execute()方法,传入要执行的sql即可,不过需要注意的是,执行sql的结果,执行的结果仍旧保存在cmd中。
执行select操作,使用fetchall()一次性取回所有的结果集
- import mysql.connector
-
- # 接收参数:user, password, host, port=3306, unix_socket and database
- # 返回一个MySQLConnection Object
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
-
- # 创建一个查询
- cmd = conn.cursor()
-
- # 执行一条原生的SQL语句,执行结果保存在cmd中,没有返回值
- cmd.execute("select id, name, age from stu")
- # 可以使用fetchall(),获取所有的查询结果集,返回值为一个tuple,每一个元素是一个list
- res = cmd.fetchall()
- print(res)
- # [(1, 'LiMing', 20), (2, 'XiaoHua', 30), (3, 'LiLei', 10)]

执行select操作,使用fetchone()每次只取一条记录
- import mysql.connector
-
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
-
- cmd = conn.cursor()
-
- cmd.execute("select id, name, age from stu")
-
- # 使用fetchone()返回一条结果集,每调用一次之后,内部指针会指向下一条结果集
- print(cmd.fetchone()) # (1, 'LiMing', 20)
- print(cmd.fetchone()) # (2, 'XiaoHua', 30)
- print(cmd.fetchone()) # (3, 'LiLei', 10)

执行select操作,使用fetchmany(num)指定每次返回的num条结果集
- import mysql.connector
-
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
-
- cmd = conn.cursor()
-
- cmd.execute("select * from stu")
-
- res = cmd.fetchmany(2) # 指定返回2条记录
- print(res)
- # [(1, 'LiMing', 20), (2, 'XiaoHua', 30)]
-
- res = cmd.fetchmany(1) # 指定返回1条记录
- print(res)
- # [(3, 'LiLei', 10)]

insert、update、delete操作,也都是使用execute方法,只需要将要执行的sql语句传入即可。
可以在执行增删改操作之后,rowcount属性保存着受影响的记录数。
每次插入一条数据
- import mysql.connector
-
- # 接收参数:user, password, host, port=3306, unix_socket and database
- # 返回一个MySQLConnection Object
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
-
- # 创建一个查询
- cmd = conn.cursor()
-
- # 执行原生SQL语句
- cmd.execute("insert into stu (id, name, age) values (4, 'LiBai', 99)")
- print(cmd.rowcount) # 1
-
- cmd.execute("select * from stu")
- res = cmd.fetchall()
- print(res)
- # [(1, 'LiMing', 20), (2, 'XiaoHua', 30), (3, 'LiLei', 10), (4, 'LiBai', 99)]

使用预处理格式(占位符格式)。可以查看一下execute()方法的定义:
- class MySQLCursor(CursorBase):
- '''
- 省略很多方法和变量
- '''
-
- def execute(self, operation, params=None, multi=False):
- """Executes the given operation
-
- Executes the given operation substituting any markers with
- the given parameters.
-
- For example, getting all rows where id is 5:
- cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
-
- The multi argument should be set to True when executing multiple
- statements in one operation. If not set and multiple results are
- found, an InterfaceError will be raised.
-
- If warnings where generated, and connection.get_warnings is True, then
- self._warnings will be a list containing these warnings.
-
- Returns an iterator when multi is True, otherwise None.
- """

示例:
- import mysql.connector
-
- conn = mysql.connector.connect(
- host='localhost',
- user='root',
- password='root',
- database='test'
- )
-
- cmd = conn.cursor()
-
- # 注意,在SQL中的占位符,统一写%s, 具体的类型,是在tuple中,传入的参数元素类型决定
- cmd.execute("select * from stu where id=%s and name=%s", (1, 'LiMing'))
- res = cmd.fetchall()
- print(res)
- # [(1, 'LiMing', 20)]

DBUtils 用户手册:https://webwareforpython.github.io/DBUtils/main.html
为什么 使用 数据库 连接池
解决方法:
DBUtils 是一个管理 "数据库连接池" 的 Python 模块,用于多线程环境的数据库连接,为 "高频率、高并发" 的数据库访问提供更好的性能。DBUtils 允许对非线程安全的数据库接口进行线程安全包装和连接。如果使用的是流行的对象关系映射器 SQLObject 或 SQLAlchemy 等,则不需要 DBUtils,因为它们带有自己的连接池。
DBUtils 包含两个子模块:
通用 DB-API 2 变体中模块的依赖关系如下图所示:
经典 PyGreSQL 变体中模块的依赖关系类似:
DBUtils 模块中包含具有类似名称的类,该类提供相应的功能。例如:模块 dbutils.pooled_db 包含类 PooledDB。
dbutils.persistent_db 使用任何 DB-API 2 数据库模块,提供线程专用的数据库连接,并自动管理连接。各个数据库连接保持分配给相应的线程,并且在线程的生命周期内不会关闭。
每当线程首次打开数据库连接时,都会打开与数据库的新连接,从现在开始将用于此特定线程。当线程关闭数据库连接时,它仍将保持打开状态,以便下次同一线程请求连接时,可以使用此已打开的连接。当线程死线时,连接将自动关闭。简而言之:persistent_db 尝试回收数据库连接以提高线程应用程序的整体数据库访问性能,但它确保线程之间永远不会共享连接。因此,即使底层 DB-API 模块在连接级别不是线程安全的,persistent_db 也能完美运行,并且当其他线程更改数据库会话或执行跨多个 SQL 命令的事务时,它将避免出现问题。
dbutils.pooled_db 使用任何 DB-API 2 数据库模块实现一个稳定的、线程安全的数据库缓存连接池,这些连接可以透明地重用。提供线程间可共享的数据库连接,并自动管理连接。pooled_db可以在不同线程之间共享打开的数据库连接。默认情况下,如果将连接池设置为正值 maxshared,并且基础 DB-API 2 在连接级别是线程安全的,则会发生这种情况,但您也可以请求不会在线程之间共享的专用数据库连接。除了共享连接池之外,您还可以设置一个至少包含 mincached 和最多 maxcached 空闲连接的池,每当线程请求专用数据库连接或共享连接池尚未满时,都会使用该池。当线程关闭不再共享的连接时,它将返回到空闲连接池,以便可以再次回收。如果底层 DB-API 模块不是线程安全的,则将使用线程锁来确保pooled_db连接是线程安全的。因此,您无需担心这一点,但是每当更改数据库会话或执行跨多个 SQL 命令的事务时,都应小心使用专用连接。
persistent_db 和 pooled_db 都具有相同的目的,即通过回收数据库连接来提高数据库访问性能,同时即使数据库连接中断也能保持稳定性。
那么应该使用这两个模块中的哪一个呢?如果你的应用程序保持恒定数量的线程(经常使用数据库),persistent_db 将更有意义。在这种情况下,您将始终拥有相同数量的打开数据库连接。但是,如果您的应用程序经常启动和结束线程,那么最好使用 pooled_db。后者还将允许进行更多的微调,特别是当您使用的是线程安全的 DB-API 2 模块时。
两个模块接口相似,可以轻松地从一个模块切换到另一个模块。
另外,实际使用的数据库驱动也有所依赖,比如SQLite数据库只能使用PersistentDB作连接池。 下载地址:http://www.webwareforpython.org/downloads/DBUtils/
安装:pip install DBUtils
这里只介绍 persistent_db 模块和更复杂的 pooled_db 模块。
查看 pooled_db 模块的文档,python解释器中执行:help(pooled_db)
使用 db.close() 可以关闭 persistent 连接
通过调用 begin() 方法显式启动事务。这可确保透明的重新打开将暂停,直到事务结束,并且连接将在同一线程重用之前回滚。
通过将 threadlocal 参数设置为 threading.local,获取连接可能会变得更快一些,但这可能并非在所有环境中都有效(例如,已知 mod_wsgi 会导致问题,因为它会在请求之间清除 threading.local 数据)。
使用 pymysql 作为 DB-API 2 数据库模块,并且本地数据库 mydb 的每个连接都重用 1000 次:
- import pymysql # import used DB-API 2 module
- from dbutils.persistent_db import PersistentDB
-
- persist = PersistentDB(pymysql, 1000, database='mydb')
- db = persist.connection()
-
- # db.close() 将被静默忽略,不需要手动调用
示例:至少包含五个连接到本地数据库 mydb 的池
- import pymysql # import used DB-API 2 module
- from dbutils.pooled_db import PooledDB
-
- pool = PooledDB(pymysql, 5, database='mydb')
- db = pool.connection() # 设置连接池后,可以从该池请求数据库连接
- # ...
- # 如果不再需要 db,则应立即执行db.close() 将连接归还到池中
- db.close()
如果设置了非零 maxshared 参数,并且 DB-API 2 模块允许这样做,则默认情况下可以与其他线程共享连接。如果要建立专用连接,则使用:db = pool.connection(shareable=False) 或者 db = pool.dedicated_connection()
警告:在线程环境中,切勿执行以下操作:pool.connection().cursor().execute(...) 这将过早释放连接以供重用,如果连接不是线程安全的,这可能是致命的。确保连接对象在使用时保持活动状态,如下所示:
- db = pool.connection()
- cur = db.cursor()
- cur.execute(...)
- res = cur.fetchone()
- cur.close() # or del cur
- db.close() # or del db
还可以将上下文管理器用于更简单的代码:
- with pool.connection() as db:
- with db.cursor() as cur:
- cur.execute(...)
- res = cur.fetchone()
通过调用 begin() 方法显式启动事务。这可确保连接不会与其他线程共享,透明的重新打开将暂停,直到事务结束,并且连接将在返回到连接池之前回滚。
有时希望在 DBUtils 使用连接之前准备连接,而仅使用正确的参数是无法实现的。例如,pyodbc 可能需要通过调用连接的 setencoding() 方法来配置连接。您可以通过将修改后的 connect() 函数作为创建者(第一个参数)传递给 PersistentDB 或 PooledDB 来做到这一点,如下所示:
- from pyodbc import connect
- from dbutils.pooled_db import PooledDB
-
- def creator():
- con = connect(...)
- con.setdecoding(...)
- return con
-
- creator.dbapi = pyodbc
-
- db_pool = PooledDB(creator, mincached=5)
如果你使用的是流行的对象关系映射器之一 SQLObject 或 SQLAlchemy,则不需要 DBUtils,因为它们自带连接池。SQLObject 2 (SQL-API) 实际上是从 DBUtils 借用一些代码来将池拆分为单独的层。
另请注意,当您使用带有 mod_python 或 mod_wsgi 的 Apache Web 服务器等解决方案时,您的 Python 代码通常会在 Web 服务器的子进程的上下文中运行。因此,如果您使用的是 pooled_db 模块,并且其中几个子进程正在运行,则您将拥有同样多的数据库连接池。如果这些进程运行许多线程,这可能仍然是一种合理的方法,但如果这些进程不会生成多个工作线程,就像 Apache 的“prefork”多处理模块一样,这种方法就没有意义了。如果运行此类配置,则应求助于支持多处理的连接池中间件,例如 PostgreSQL 数据库的 pgpool 或 pgbouncer。
一些相关和替代软件的链接:
Webware for Python framework
Python DB-API 2
PostgreSQL database
PyGreSQL Python adapter for PostgreSQL
pgpool middleware for PostgreSQL connection pooling
pgbouncer lightweight PostgreSQL connection pooling
SQLObject object-relational mapper
SQLAlchemy object-relational mapper
- import pymysql
- conn = pymysql.connect(
- host='127.0.0.1', port=3306,
- db='db_test',
- user='root', password='xxx',
- charset='utf8'
- )
- cur = conn.cursor()
- SQL = "select * from table1"
- r = cur.execute(SQL)
- r = cur.fetchall()
- cur.close()
- conn.close()
- import pymysql # import used DB-API 2 module
- from dbutils.persistent_db import PersistentDB
-
- persist = PersistentDB(pymysql, 1000, database='mydb')
- # 获取连接池。连接池对象只初始化一次,一般可以作为模块级代码来确保。
- db_conn = persist.connection()
-
- # 将被静默忽略,不需要手动调用
- db_conn.close()
- import pymysql
- from dbutils.pooled_db import PooledDB
-
- # 定义连接参数
- pool = PooledDB(
- creator=pymysql,
- maxconnections=6,
- mincached=2,
- maxcached=5,
- blocking=True,
- host='localhost',
- user='root',
- passwd='123456',
- db='mydb',
- port=3306,
- charset='utf8mb4'
- )
-
-
- def main():
- # 从连接池获取连接
- conn = pool.connection()
- cursor = conn.cursor()
-
- # 执行 SQL 语句
- sql = "SELECT * FROM students"
- cursor.execute(sql)
- result = cursor.fetchall()
-
- # 处理查询结果
- for row in result:
- print(row)
-
- # 关闭游标和连接
- cursor.close()
- conn.close()
-
-
- if __name__ == '__main__':
- main()

- """
- 使用DBUtils数据库连接池中的连接,操作数据库
- """
- import json
- import pymysql
- import datetime
- from DBUtils.PooledDB import PooledDB
- import pymysql
-
-
- class MysqlClient(object):
- __pool = None;
-
- def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
- maxusage=100, setsession=None, reset=True,
- host='127.0.0.1', port=3306, db='test',
- user='root', passwd='123456', charset='utf8mb4'):
- """
- :param mincached:连接池中空闲连接的初始数量
- :param maxcached:连接池中空闲连接的最大数量
- :param maxshared:共享连接的最大数量
- :param maxconnections:创建连接池的最大数量
- :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
- :param maxusage:单个连接的最大重复使用次数
- :param setsession:optional list of SQL commands that may serve to prepare
- the session, e.g. ["set datestyle to ...", "set time zone ..."]
- :param reset:how connections should be reset when returned to the pool
- (False or None to rollback transcations started with begin(),
- True to always issue a rollback for safety's sake)
- :param host:数据库ip地址
- :param port:数据库端口
- :param db:库名
- :param user:用户名
- :param passwd:密码
- :param charset:字符编码
- """
-
- if not self.__pool:
- self.__class__.__pool = PooledDB(pymysql,
- mincached, maxcached,
- maxshared, maxconnections, blocking,
- maxusage, setsession, reset,
- host=host, port=port, db=db,
- user=user, passwd=passwd,
- charset=charset,
- cursorclass=pymysql.cursors.DictCursor
- )
- self._conn = None
- self._cursor = None
- self.__get_conn()
-
- def __get_conn(self):
- self._conn = self.__pool.connection();
- self._cursor = self._conn.cursor();
-
- def close(self):
- try:
- self._cursor.close()
- self._conn.close()
- except Exception as e:
- print(e)
-
- def __execute(self, sql, param=()):
- count = self._cursor.execute(sql, param)
- print(count)
- return count
-
- @staticmethod
- def __dict_datetime_obj_to_str(result_dict):
- """把字典里面的datatime对象转成字符串,使json转换不出错"""
- if result_dict:
- result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
- result_dict.update(result_replace)
- return result_dict
-
- def select_one(self, sql, param=()):
- """查询单个结果"""
- count = self.__execute(sql, param)
- result = self._cursor.fetchone()
- """:type result:dict"""
- result = self.__dict_datetime_obj_to_str(result)
- return count, result
-
- def select_many(self, sql, param=()):
- """
- 查询多个结果
- :param sql: qsl语句
- :param param: sql参数
- :return: 结果数量和查询结果集
- """
- count = self.__execute(sql, param)
- result = self._cursor.fetchall()
- """:type result:list"""
- [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
- return count, result
-
- def execute(self, sql, param=()):
- count = self.__execute(sql, param)
- return count
-
- def begin(self):
- """开启事务"""
- self._conn.autocommit(0)
-
- def end(self, option='commit'):
- """结束事务"""
- if option == 'commit':
- self._conn.autocommit()
- else:
- self._conn.rollback()
-
-
- if __name__ == "__main__":
- mc = MysqlClient()
- sql1 = 'SELECT * FROM shiji WHERE id = 1'
- result1 = mc.select_one(sql1)
- print(json.dumps(result1[1], ensure_ascii=False))
-
- sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
- param = (2, 3, 4)
- print(json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False))

ORM 全称 Object Relational Mapping(对象关系映射)。是把 "关系数据库的表结构" 映射到 "Python对象" 上,这样就可以直接操纵 Python 对象,不用再写SQL进行操作,也就是在代码层面考虑的是对象,而不是SQL。具体的实现方式是
这样就可以通过使用对象中的方法来操作数据,而不必关心底层的 SQL 语句和数据库细节。
使用 ORM 的好处包括:
常见的 ORM 框架包括:
注意:ORM 并不能解决所有数据库问题。在某些情况下,复杂的查询和性能要求可能需要直接使用原生 SQL。因此,根据具体的需求和场景,谨慎选择和使用合适的 ORM 框架。
dictalchemy 是一个第三方扩展库,用于为 SQLAlchemy 模型类添加 .asdict() 和 .fromdict() 方法,
这样可以方便地将模型实例转换为字典,或者从字典创建/更新模型实例。
文档:https://pythonhosted.org/dictalchemy/
安装:pip install dictalchemy3 (移除对 Python2的支持,兼容最新的SQLAlchemy )
示例
- from sqlalchemy import create_engine, Column, Integer, String, CHAR
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from dictalchemy import make_class_dictable
-
- Base = declarative_base()
-
- # 让 Base 类的所有派生类支持 dictalchemy 的方法
- make_class_dictable(Base)
-
-
- class City(Base):
- __tablename__ = 'city'
-
- ID = Column(Integer, primary_key=True)
- Name = Column(CHAR(35), nullable=False)
- CountryCode = Column(CHAR(3), nullable=False)
- District = Column(CHAR(20), nullable=False)
- Population = Column(Integer, nullable=False)
-
-
- # 创建数据库引擎和session
- engine = create_engine('mysql+pymysql://root:root@127.0.0.1:3306/world')
- Session = sessionmaker(bind=engine)
- session = Session()
-
- # 创建表格(在实际数据库中)
- Base.metadata.create_all(engine)
-
- # 添加数据
- # new_city = City(ID=9999, Name='test', CountryCode='test', District='test', Population=9999)
- # session.add(new_city)
- # session.commit()
-
- # 查询数据
- user = session.query(City).first()
- # 将查询到的数据转换成字典
- user_dict = user.asdict()
- print(user_dict)

在 Python 中,要将一个对象转变成 dict 类型,通常有几种方式:
使用__dict__属性:如果对象是一个普通的 Python 对象,它通常有一个__dict__属性,包含了所有实例属性。
- class MyObject:
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- obj = MyObject('test', 123)
- obj_dict = obj.__dict__
- print(obj_dict)
- # 输出: {'name': 'test', 'value': 123}
使用 vars() 函数:vars() 函数也可以用来获取对象的 __dict__ 属性。
- obj = MyObject('test', 123)
- obj_dict = vars(obj)
- print(obj_dict)
- # 输出: {'name': 'test', 'value': 123}
使用字典推导:如果对象的属性较少,或者你只想要部分属性,可以使用字典推导。
- obj = MyObject('test', 123)
- obj_dict = {key: getattr(obj, key) for key in ['name', 'value']}
- print(obj_dict)
- # 输出: {'name': 'test', 'value': 123}
通过自定义方法:在类中定义一个方法来将对象的属性转换为字典。
- class MyObject:
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- def to_dict(self):
- return {'name': self.name, 'value': self.value}
-
- obj = MyObject('test', 123)
- obj_dict = obj.to_dict()
- print(obj_dict)
- # 输出: {'name': 'test', 'value': 123}
或者
- from sqlalchemy.orm import class_mapper
-
- def serialize(model):
- """将 SQLAlchemy model 实例序列化为字典"""
- columns = [c.key for c in class_mapper(model.__class__).columns]
- return dict((c, getattr(model, c)) for c in columns)
-
- # 假设有一个 SQLAlchemy 模型实例 user
- user_as_dict = serialize(user)
在 Python 中最有名的 ORM 框架是 SQLAlchemy 。它可以与任意的第三方 web 框架相结合,如 flask、tornado、django、fastapi 等。SQLALchemy 相较于 Django ORM 来说更贴近原生的 SQL 语句,因此学习难度较低。
安装:pip install SQLAlchemy
Python中强大的通用ORM框架:SQLAlchemy:https://zhuanlan.zhihu.com/p/444930067
Python ORM之SQLAlchemy全面指南:https://zhuanlan.zhihu.com/p/387078089
SQLAlchemy 文档:https://www.sqlalchemy.org/
SQLAlchemy入门和进阶:https://zhuanlan.zhihu.com/p/27400862
SQLAlchemy 2.0 教程:https://wiki.masantu.com/sqlalchemy-tutorial/
SQLALchemy 由以下5个部分组成:
图示如下:
运行流程:
相关概念
常见数据类型
SQLAlchemy 必须依赖其他操纵数据库的模块才能进行使用,也就是上面提到的 DBAPI。
SQLAlchemy 配合 DBAPI 使用时,链接字符串也有所不同,如下所示:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
任何 SQLAlchemy 应用程序的开始都是一个 Engine 对象,提供到数据库的连接池。Engine对象通常是一个只为特定数据库服务器创建一次的全局对象,并使用一个URL字符串进行配置,该字符串将描述如何连接到数据库主机或后端。
>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite:///:memory:', echo=True)初始化创建 engine ,engine 内部维护了一个Pool(连接池)和Dialect(方言),方言来识别具体连接数据库种类。
创建好了 engine 的同时,Pool 和 Dialect 也已经创建好了,但是此时并没有真正与数据库连接,等到执行具体的语句.connect()等时才会连接到数据库。
create_engine 的参数有很多,一些比较常用的:
repr()
其参数列表的默认日志处理程序。utf-8
PoolListener
将接收连接池事件的对象。也就是在 Python 中创建的一个类,对应着数据库中的一张表,类的每个属性,就是这个表的字段名,这种 类对应于数据库中表的类,就称为映射类。
我们要创建一个映射类,是基于基类定义的,每个映射类都要继承这个基类 declarative_base()。
from sqlalchemy.orm import declarative_base
Base = declarative_base()
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
declarative_base() 是 sqlalchemy 内部封装的一个方法,通过其构造一个基类,这个基类以及它的子类,可以将Python类和数据库表关联映射起来。
数据库表模型类通过 __tablename__ 和表关联起来,Column 表示数据表的列。
示例:
- from sqlalchemy import Column, Integer, String
-
-
- class User(Base):
- __tablename__ = "users"
-
- id = Column(Integer, primary_key=True)
- name = Column(String)
- fullname = Column(String)
- nickname = Column(String)
-
- def __repr__(self):
- return "<User(name='%s', fullname='%s', nickname='%s')>" % (
- self.name,
- self.fullname,
- self.nickname,
- )

通过定义 User类,已经定义了关于表的信息,称为 table metadata,也就是表的元数据。
User.__table__
Table('users', MetaData(),
Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
Column('name', String(), table=<users>),
Column('fullname', String(), table=<users>),
Column('nickname', String(), table=<users>), schema=None)
开始 创建表:如果存在则忽略,执行下面代码,就会发现在 db 中创建了 users 表。
Base.metadata.create_all(engine)
sqlalchemy 中使用 session 用于程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。即 对表的所有操作,都是通过会话实现的。
通过 sessionmaker 调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
# 实例化
session = Session()
session 的常见操作方法包括:
>>> ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
>>> session.add(ed_user)
新增了一个用户,此时这个数据并没有被同步的数据库中,而是处于等待的状态。
上面代码,实例对象只是在环境的内存中有效,并没有在表中真正生成数据。
只有执行了 commit() 方法后,才会真正在数据表中创建数据。
如果我们查询数据库,则首先刷新所有待处理信息,然后立即发出查询。
>>> our_user = session.query(User).filter_by(name='ed').first()
>>> our_user
<User(name='ed', fullname='Ed Jones', nickname='edsnickname')>
此时得到的结果也并不是数据库表中的最终数据,而是映射类的一个对象。
增
add_user = Users("test", "test123@qq.com")
session.add(add_user)
session.commit()
session.add() 将会把 Model 加入当前 session 维护的持久空间(可以从session.dirty看到)中,直到commit 时提交到数据库。
add 之后执行 db.session.flush(),这样便可在session中get到对象的属性。
批量插入共有以下几种方法,对它们的批量做了比较,分别是:
session.add_all() < bulk_save_object() < bulk_insert_mappings() < SQLAlchemy_core()
查
查询是最常用的一个操作了,举个最简单的查询例子:
users = session.query(Users).filter_by(id=1).all()
for item in users:
print(item.name)
通常我们通过以上查询模式获取数据,需要注意的是,通过session.query()我们查询返回了一个Query对象,此时还没有去具体的数据库中查询,只有当执行具体的.all(),.first()等函数时才会真的去操作数据库。
其中,query 有 filter 和 filter_by 两个过滤方法,通常这两个方法都会用到的,
上述例子也可写为:
users = session.query(Users).filter_by(Users.id == 1).all()
改
更新数据有两种方法,一种是使用 query 中的 update 方法:
session.query(Users).filter_by(id=1).update({'name': "Jack"})
另一种是操作对应的表模型:
users = session.query(Users).filter_by(name="Jack").first()
users.name = "test"
session.add(users)
一般批量更新的话可以选前者,而要对查询获取对象属性之后再更新的场景就需要使用后者。
删
和更新数据类似,删除数据也有两种方法,第一种:
delete_users = session.query(Users).filter(Users.name == "test").first()
if delete_users:
session.delete(delete_users)
session.commit()
第二种:( 批量删除时推荐 )
session.query(Users).filter(Users.name == "test").delete()
session.commit()
在 commit() 之前,对实例对象的属性所做的更改,可以进行回滚,回到更改之前。
>>> session.rollback()
本质上只是把某一条数据(也就是映射类的实例)从内存中删除而已,并没有对数据库有任何操作。
通过 query 关键字查询。
>>> for instance in session.query(User).order_by(User.id):
... print(instance.name, instance.fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flintstone
query.join() 连接查询
>>> session.query(User).join(Address).\
... filter(Address.email_address=='jack@google.com').\
... all()
[<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>]
query(column.label()) 可以为字段名(列)设置别名:
>>> for row in session.query(User.name.label('name_label')).all():
... print(row.name_label)
ed
wendy
mary
fred
aliased()为查询对象设置别名:
>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')SQL>>> for row in session.query(user_alias, user_alias.name).all():
... print(row.user_alias)
<User(name='ed', fullname='Ed Jones', nickname='eddie')>
<User(name='wendy', fullname='Wendy Williams', nickname='windy')>
<User(name='mary', fullname='Mary Contrary', nickname='mary')>
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>
# 等于
query.filter(User.name == 'ed')# 不等于
query.filter(User.name != 'ed')# like和ilike
query.filter(User.name.like('%ed%'))
query.filter(User.name.ilike('%ed%')) # 不区分大小写# in
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name.in_(
session.query(User.name).filter(User.name.like('%ed%'))
))
# not in
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))# is
query.filter(User.name == None)
query.filter(User.name.is_(None))# is not
query.filter(User.name != None)
query.filter(User.name.is_not(None))# and
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# or
from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))# match
query.filter(User.name.match('wendy'))
1. 等值过滤器
session.query(Account).filter(Account.user_name=='Jack')
session.query(Account).filter(Account.salary==2000)2. 不等于过滤器(!=, <, >, <=, >=)
session.query(Account).filter(Account.user_name != 'Jack')
session.query(Account).filter(Account.salary != 2000)
session.query(Account).filter(Account.salary > 3000)3. 模糊查询(like)
模糊查询只适用于查询字符串类型,不适用于数值类型
#查询所有名字中包含字母i的用户
session.query(Account).filter(Account.user_name.like('%i%'))
#查询所有title中以Manager结尾的用户
session.query(Account).filter(Account.title.like('%Manager'))
#查询的有名字中以Da开头的用户
session.query(Account).filter(Account.user_name.like('Da%'))4. 包括过滤器(in_)
#查询id不为1,3,5的记录
session.query(Account).filter(~Account.id.in_([1,3,5]))
#查询工资不为2000,3000,4000的记录
session.query(Account).filter(~Account.salary.in_([2000,3000,4000]))
#查询所有title不为Engineer和Accountant的记录
session.query(Account).filter(~Account.title.in_(['Account','Engineer']))5. 判断是否为空(is NULL, is not NULL)
#查询salary为空值的记录
session.query(Account).filter(Account.salary.is_(None))
session.query(Account).filter(Account.salary == None)
#查询salary不为空值的记录
session.query(Account).filter(Account.salary.isnot(None))
session.query(Account).filter(Account.salary != None)6. 非逻辑 ~
#查询id不为1,3,5的记录
session.query(Account).filter(~Account.id.in_([1,3,5]))7. 与逻辑 (and_)
#直接多个条件查询
session.query(Account).filter(Account.title='Engineer', Account.salary==3000)
#用关键字and_进行与逻辑查询
from sqlalchemy import and_
session.query(Account).filter(and_(Account.title=='Engineer', Account.salary==3000))
#通过多个filter链接查询
session.query(Account).filter(Account.title=='Engineer').filter(Account.salary==3000)8. 或逻辑(or_)
from sqlalchemy import or_
#查询title是Engineer或者salary为3000的记录
session.query(Account).filter(or_(Account.title=='Engineer', Account.salary==3000))
文字字符串可以灵活地用于Query
查询。
from sqlalchemy import text
for user in session.query(User).filter(text("id<224")).order_by(text("id")).all():
print(user.name)
使用冒号指定绑定参数。要指定值,请使用
方法:Query.params()
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>
一个用户可以有多个邮件地址,意味着我们要新建一个表与用户表进行映射和查询。
>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy.orm import relationship>>> class Address(Base):
... __tablename__ = 'addresses'
... id = Column(Integer, primary_key=True)
... email_address = Column(String, nullable=False)
... user_id = Column(Integer, ForeignKey('users.id'))
...
... user = relationship("User", back_populates="addresses")
...
... def __repr__(self):
... return "<Address(email_address='%s')>" % self.email_address>>> User.addresses = relationship(
... "Address", order_by=Address.id, back_populates="user")
ForeignKey
定义两列之间依赖关系,表示关联了用户表的用户ID
relationship 告诉ORMAddress
类本身应链接到User
类,back_populates 表示引用的互补属性名,也就是本身的表名。
除了表的一对多,还存在多对多的关系,例如在一个博客网站中,有很多的博客BlogPost
,每篇博客有很多的Keyword
,每一个Keyword
又能对应很多博客。
对于普通的多对多,我们需要创建一个未映射的
构造以用作关联表。如下所示:Table
>>> from sqlalchemy import Table, Text
>>> # association table
>>> post_keywords = Table('post_keywords', Base.metadata,
... Column('post_id', ForeignKey('posts.id'), primary_key=True),
... Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... )
下一步我们定义BlogPost
和Keyword
,使用互补 relationship 构造,每个引用post_keywords
表作为关联表:
>>> class BlogPost(Base):
... __tablename__ = 'posts'
...
... id = Column(Integer, primary_key=True)
... user_id = Column(Integer, ForeignKey('users.id'))
... headline = Column(String(255), nullable=False)
... body = Column(Text)
...
... # many to many BlogPost<->Keyword
... keywords = relationship('Keyword',
... secondary=post_keywords,
... back_populates='posts')
...
... def __init__(self, headline, body, author):
... self.author = author
... self.headline = headline
... self.body = body
...
... def __repr__(self):
... return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)
>>> class Keyword(Base):
... __tablename__ = 'keywords'
...
... id = Column(Integer, primary_key=True)
... keyword = Column(String(50), nullable=False, unique=True)
... posts = relationship('BlogPost',
... secondary=post_keywords,
... back_populates='keywords')
...
... def __init__(self, keyword):
... self.keyword = keyword
多对多关系的定义特征是secondary
关键字参数引用
表示关联表的对象。Table
- from sqlalchemy.dialects.mysql import INTEGER, VARCHAR
- from sqlalchemy import Table, Column, Date, Integer, String, ForeignKey
- from sqlalchemy import create_engine, MetaData
- from sqlalchemy.orm import declarative_base, sessionmaker
- from sqlalchemy_utils import database_exists, create_database
-
- ############################################################################
- conn_mysql_string = 'mysql+pymysql://root:root@127.0.0.1:3306'
- db_name = 'local_test'
- conn_db_string = f'{conn_mysql_string}/{db_name}'
- base_table = declarative_base()
-
-
- ############################################################################
- # users表结构
- class Users(base_table):
- __tablename__ = 'users'
-
- user_id = Column(INTEGER, primary_key=True)
- # user_id = Column(Integer, primary_key=True)
- user_name = Column(String(50))
- fullname = Column(String(50))
- nickname = Column(String(50))
- age = Column(Integer)
- place = Column(String(50), nullable=False)
- descript = Column(String(50), nullable=False)
-
- def __init__(self, user_id, user_name, fullname, nickname, age, place, descript):
- self.user_id = user_id
- self.user_name = user_name
- self.fullname = fullname
- self.nickname = nickname
- self.age = age
- self.place = place
- self.descript = descript
-
- def __repr__(self):
- return "<User(name='%s', fullname='%s', nickname='%s')>" % (
- self.name,
- self.fullname,
- self.nickname,
- )
-
-
- ############################################################################
-
-
- def create_db_1():
- # 创建引擎,并连接 mysql
- engine = create_engine(conn_mysql_string, encoding="utf-8", echo=True)
- engine.execute(f'CREATE DATABASE {db_name}')
- print('创建 "数据库" 成功')
-
-
- def create_db_2():
- with create_engine(conn_mysql_string, isolation_level='AUTOCOMMIT').connect() as connection:
- connection.execute(f'CREATE DATABASE {db_name} charset="utf8"')
-
-
- def create_db_3():
- # 利用 sqlalchemy_utils 库的 create_database 模块
- engine = create_engine(conn_db_string)
- if not database_exists(engine.url):
- create_database(engine.url)
- print(database_exists(engine.url))
-
-
- def create_table_1():
- # 创建引擎,并连接数据库
- engine = create_engine(conn_db_string, encoding="utf-8", echo=True)
- # declarative_base() 是 sqlalchemy 内部封装的一个方法
- # 通过其构造一个基类,这个基类以及它的子类,可以将Python类和数据库表关联映射起来。
- base_table.metadata.create_all(engine)
- pass
-
-
- def create_table_2():
- engine = create_engine(conn_db_string, encoding="utf-8", echo=True)
- # 绑定引擎
- metadata = MetaData(engine)
- # 定义表格
- user_table = Table(
- 'user', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(50)),
- Column('fullname', String(100))
- )
-
- address_table = Table(
- 'address', metadata,
- Column('id', Integer, primary_key=True),
- Column('user_id', None, ForeignKey('user.id')),
- Column('email', String(128), nullable=False)
- )
- metadata.create_all()
- pass
-
-
- def insert_data():
- # 初始化数据库连接
- engine = create_engine(conn_db_string, encoding="utf-8")
- # 创建 DBSession类型
- db_session = sessionmaker(bind=engine)
- # 创建session对象
- session = db_session()
- # 插入单条数据
- # 创建新User对象
- new_user = Users(
- user_id=1, user_name='Jack', fullname='fullname', nickname='nickname',
- age=25, place='USA', descript='descript'
- )
- # 添加到session
- session.add(new_user)
- # 提交即保存到数据库
- session.commit()
-
- # 插入多条数据
- user_list = [
- Users(user_id=2, user_name='Green', fullname='fullname', nickname='nickname', age=26, place='UK',
- descript='descript'),
- Users(user_id=3, user_name='Alex', fullname='fullname', nickname='nickname', age=31, place='GER',
- descript='descript'),
- Users(user_id=4, user_name='Chen', fullname='fullname', nickname='nickname', age=52, place='CHN',
- descript='descript'),
- Users(user_id=5, user_name='Zhang', fullname='fullname', nickname='nickname', age=42, place='CHN',
- descript='descript')
- ]
- session.add_all(user_list)
- session.commit()
- # 关闭session
- session.close()
- print('数据插入成功')
-
-
- def query_data():
- # 初始化数据库连接
- engine = create_engine(conn_db_string, encoding="utf-8")
- # 创建 DBSession类型
- db_session = sessionmaker(bind=engine)
- # 创建session对象
- session = db_session()
- # 查询所有place是CHN的人名
- # 创建Query查询,filter是where条件
- # 调用one()返回唯一行,如果调用all()则返回所有行:
- users = session.query(Users).filter(Users.place == 'CHN').all()
- print([use.user_name for use in users])
- # 或者用如下查询
- users = session.query(Users.user_name).filter(Users.place == 'CHN').all()
- print(users)
- session.close()
-
-
- def update_data():
- # 初始化数据库连接
- engine = create_engine(conn_db_string, encoding="utf-8")
- # 创建 DBSession类型
- db_session = sessionmaker(bind=engine)
- # 创建session对象
- session = db_session()
- # 数据更新,将Jack的place修改为CHN
- update_obj = session.query(Users).filter(Users.user_name == 'Jack').update({"place": "CHN"})
- session.commit()
- session.close()
- print("更新数据成功")
-
-
- def delete_data():
- # 初始化数据库连接
- engine = create_engine(conn_db_string, encoding="utf-8")
- # 创建 DBSession类型
- db_session = sessionmaker(bind=engine)
- # 创建session对象
- session = db_session()
- # 数据更新,将Jack的记录删除
- update_obj = session.query(Users).filter(Users.name == 'Jack').delete()
- session.commit()
- session.close()
- print("Delete data successfully!")
-
-
- if __name__ == '__main__':
- # create_db_1()
- # create_db_2()
- # create_db_3()
- # create_table_1()
- # create_table_2()
- # insert_data()
- query_data()
- # update_data()
- # delete_data()
- pass

SQLAlchemy 不允许修改表结构,如果需要修改表结构则必须删除旧表,再创建新表,或者执行原生的 SQL 语句 ALERT TABLE 进行修改。这意味着在使用非原生SQL语句修改表结构时,表中已有的所有记录将会丢失,所以我们最好一次性的设计好整个表结构避免后期修改:
- # models.py
- import datetime
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.orm import scoped_session
-
- from sqlalchemy import (
- create_engine,
- Column,
- Integer,
- String,
- Enum,
- DECIMAL,
- DateTime,
- Boolean,
- UniqueConstraint,
- Index,
- )
- from sqlalchemy.ext.declarative import declarative_base
-
- # 基础类
- Base = declarative_base()
-
- # 创建引擎
- engine = create_engine(
- "mysql+pymysql://tom:123@192.168.0.120:3306/db1?charset=utf8mb4",
- # "mysql+pymysql://tom@127.0.0.1:3306/db1?charset=utf8mb4", # 无密码时
- # 超过链接池大小外最多创建的链接
- max_overflow=0,
- # 链接池大小
- pool_size=5,
- # 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
- pool_timeout=10,
- # 多久之后对链接池中的链接进行一次回收
- pool_recycle=1,
- # 查看原生语句(未格式化)
- echo=True,
- )
-
- # 绑定引擎
- Session = sessionmaker(bind=engine)
- # 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象conn
- # 内部会采用threading.local进行隔离
- session = scoped_session(Session)
-
-
- class UserInfo(Base):
- """必须继承Base"""
-
- # 数据库中存储的表名
- __tablename__ = "userInfo"
- # 对于必须插入的字段,采用nullable=False进行约束,它相当于NOT NULL
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- name = Column(String(32), index=True, nullable=False, comment="姓名")
- age = Column(Integer, nullable=False, comment="年龄")
- phone = Column(DECIMAL(6), nullable=False, unique=True, comment="手机号")
- address = Column(String(64), nullable=False, comment="地址")
- # 对于非必须插入的字段,不用采取nullable=False进行约束
- gender = Column(Enum("male", "female"), default="male", comment="性别")
- create_time = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
- last_update_time = Column(
- DateTime, onupdate=datetime.datetime.now, comment="最后更新时间"
- )
- delete_status = Column(Boolean(), default=False, comment="是否删除")
-
- __table__args__ = (
- UniqueConstraint("name", "age", "phone"), # 联合唯一约束
- Index("name", "addr", unique=True), # 联合唯一索引
- )
-
- def __str__(self):
- return f"object : <id:{self.id} name:{self.name}>"
-
-
- if __name__ == "__main__":
- # 删除表
- Base.metadata.drop_all(engine)
- # 创建表
- Base.metadata.create_all(engine)

新增单条记录:
- # 获取链接池、ORM表对象
- import models
-
-
- user_instance = models.UserInfo(
- name="Jack",
- age=18,
- phone=330621,
- address="Beijing",
- gender="male"
- )
-
- models.session.add(user_instance)
-
- # 提交
- models.session.commit()
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

批量新增能减少TCP链接次数,提升插入性能:
- # 获取链接池、ORM表对象
- import models
-
-
- user_instance1 = models.UserInfo(
- name="Tom",
- age=19,
- phone=330624,
- address="Shanghai",
- gender="male"
- )
-
- user_instance2 = models.UserInfo(
- name="Mary",
- age=20,
- phone=330623,
- address="Chongqing",
- gender="female"
- )
-
-
- models.session.add_all(
- (
- user_instance1,
- user_instance2
- )
- )
-
- # 提交
- models.session.commit()
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

修改某些记录:
- # 获取链接池、ORM表对象
- import models
-
- # 修改的信息:
- # - Jack -> Jack + son
- # 在SQLAlchemy中,四则运算符号只能用于数值类型
- # 如果是字符串类型需要在原本的基础值上做改变,必须设置
- # - age -> age + 1
- # synchronize_session=False
-
- models.session.query(models.UserInfo)\
- .filter_by(name="Jack")\
- .update(
- {
- "name": models.UserInfo.name + "son",
- "age": models.UserInfo.age + 1
- },
- synchronize_session=False
- )
- # 本次修改具有字符串字段在原值基础上做更改的操作,所以必须添加
- # synchronize_session=False
- # 如果只修改年龄,则不用添加
-
- # 提交
- models.session.commit()
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

删除记录用的比较少,了解即可,一般都是像上面那样增加一个delete_status的字段,如果为1则代表删除:
- # 获取链接池、ORM表对象
- import models
-
- models.session.query(models.UserInfo).filter_by(name="Mary").delete()
-
- # 提交
- models.session.commit()
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()
查所有记录、所有字段,all()方法将返回一个列表,内部包裹着每一行的记录对象:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(models.UserInfo)\
- .all()
-
- print(result)
- # [<models.UserInfo object at 0x7f4d3d606fd0>, <models.UserInfo object at 0x7f4d3d606f70>]
-
- for row in result:
- print(row)
- # object : <id:1 name:Jackson>
- # object : <id:2 name:Tom>
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

查所有记录、某些字段(注意,下面返回的元组实际上是一个命名元组,可以直接通过.操作符进行操作):
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo.id,
- models.UserInfo.name,
- models.UserInfo.age
- ).all()
-
- print(result)
- # [(1, 'Jackson', 19), (2, 'Tom', 19)]
-
- for row in result:
- print(row)
- # (1, 'Jackson', 19)
- # (2, 'Tom', 19)
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

只拿第一条记录,first()方法将返回单条记录对象(注意,下面返回的元组实际上是一个命名元组,可以直接通过.操作符进行操作):
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo.id,
- models.UserInfo.name,
- models.UserInfo.age
- ).first()
-
- print(result)
- # (1, 'Jackson', 19)
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()
通过字段的label()方法,我们可以为它取一个别名:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo.name.label("s_name"),
- models.UserInfo.age.label("s_age")
- ).all()
-
- for row in result:
- print(row.s_name)
- print(row.s_age)
-
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()
一个条件的过滤:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- models.UserInfo.name == "Jackson"
- ).all()
-
- # 上面是Python语句形式的过滤条件,由filter方法调用
- # 亦可以使用ORM的形式进行过滤,通过filter_by方法调用
- # 如下所示
- # .filter_by(name="Jackson").all()
- # 个人更推荐使用filter过滤,它看起来更直观,更简单,可以支持 == != > < >= <=等常见符号
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f11391ea2b0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

AND查询:
- # 获取链接池、ORM表对象
- import models
- # 导入AND
- from sqlalchemy import and_
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- and_(
- models.UserInfo.name == "Jackson",
- models.UserInfo.gender == "male"
- )
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f11391ea2b0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

OR查询:
- # 获取链接池、ORM表对象
- import models
- # 导入OR
- from sqlalchemy import or_
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- or_(
- models.UserInfo.name == "Jackson",
- models.UserInfo.gender == "male"
- )
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f11391ea2b0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

NOT查询:
- # 获取链接池、ORM表对象
- import models
- # 导入NOT
- from sqlalchemy import not_
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- not_(
- models.UserInfo.name == "Jackson",
- )
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f11391ea2b0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

BETWEEN查询:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- models.UserInfo.age.between(15, 21)
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f11391ea2b0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

IN查询:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- models.UserInfo.age.in_((18, 19, 20))
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 2
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7fdeeaa774f0>, <models.UserInfo object at 0x7fdeeaa77490>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

NOT IN,只需要加上~即可:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- ~models.UserInfo.age.in_((18, 19, 20))
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 0
-
- # 过滤成功的结果
- print(result)
- # []
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

LIKE查询:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- models.UserInfo.name.like("Jack%")
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7fee1614f4f0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

对结果all()返回的列表进行一次切片即可:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).all()[0:1]
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7fee1614f4f0>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

ASC升序、DESC降序,需要指定排序规则:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.UserInfo,
- ).filter(
- models.UserInfo.age > 12
- ).order_by(
- models.UserInfo.age.desc()
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 2
-
- # 过滤成功的结果
- print(result)
- # [<models.UserInfo object at 0x7f90eccd26d0>, <models.UserInfo object at 0x7f90eccd2670>]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

聚合分组与having过滤:
- # 获取链接池、ORM表对象
- import models
- # 导入聚合函数
- from sqlalchemy import func
-
- result = models.session.query(
- func.sum(models.UserInfo.age)
- ).group_by(
- models.UserInfo.gender
- ).having(
- func.sum(models.UserInfo.id > 1)
- ).all()
-
- # 过滤成功的结果数量
- print(len(result))
- # 1
-
- # 过滤成功的结果
- print(result)
- # [(Decimal('38'),)]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

五表关系:
建表语句:
- # models.py
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.orm import scoped_session
- from sqlalchemy.orm import relationship
-
- from sqlalchemy import (
- create_engine,
- Column,
- Integer,
- Date,
- String,
- Enum,
- ForeignKey,
- UniqueConstraint,
- )
- from sqlalchemy.ext.declarative import declarative_base
-
- # 基础类
- Base = declarative_base()
-
- # 创建引擎
- engine = create_engine(
- "mysql+pymysql://tom:123@192.168.0.120:3306/db1?charset=utf8mb4",
- # "mysql+pymysql://tom@127.0.0.1:3306/db1?charset=utf8mb4", # 无密码时
- # 超过链接池大小外最多创建的链接
- max_overflow=0,
- # 链接池大小
- pool_size=5,
- # 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
- pool_timeout=10,
- # 多久之后对链接池中的链接进行一次回收
- pool_recycle=1,
- # 查看原生语句
- # echo=True
- )
-
- # 绑定引擎
- Session = sessionmaker(bind=engine)
- # 创建数据库链接池,直接使用session即可为当前线程拿出一个链接对象
- # 内部会采用threading.local进行隔离
- session = scoped_session(Session)
-
-
- class StudentsNumberInfo(Base):
- """学号表"""
- __tablename__ = "studentsNumberInfo"
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- number = Column(Integer, nullable=False, unique=True, comment="学生编号")
- admission = Column(Date, nullable=False, comment="入学时间")
- graduation = Column(Date, nullable=False, comment="毕业时间")
-
-
- class TeachersInfo(Base):
- """教师表"""
- __tablename__ = "teachersInfo"
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- number = Column(Integer, nullable=False, unique=True, comment="教师编号")
- name = Column(String(64), nullable=False, comment="教师姓名")
- gender = Column(Enum("male", "female"), nullable=False, comment="教师性别")
- age = Column(Integer, nullable=False, comment="教师年龄")
-
-
- class ClassesInfo(Base):
- """班级表"""
- __tablename__ = "classesInfo"
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- number = Column(Integer, nullable=False, unique=True, comment="班级编号")
- name = Column(String(64), nullable=False, unique=True, comment="班级名称")
- # 一对一关系必须为连接表的连接字段创建UNIQUE的约束,这样才能是一对一,否则是一对多
- fk_teacher_id = Column(
- Integer,
- ForeignKey(
- "teachersInfo.id",
- ondelete="CASCADE",
- onupdate="CASCADE",
- ),
- nullable=False,
- unique=True,
- comment="班级负责人"
- )
- # 下面这2个均属于逻辑字段,适用于正反向查询。在使用ORM的时候,我们不必每次都进行JOIN查询,而恰好正反向的查询使用频率会更高
- # 这种逻辑字段不会在物理层面上创建,它只适用于查询,本身不占据任何数据库的空间
- # sqlalchemy的正反向概念与Django有所不同,Django是外键字段在那边,那边就作为正
- # 而sqlalchemy是relationship字段在那边,那边就作为正
- # 比如班级表拥有 relationship 字段,而老师表不曾拥有
- # 那么用班级表的这个relationship字段查老师时,就称为正向查询
- # 反之,如果用老师来查班级,就称为反向查询
- # 另外对于这个逻辑字段而言,根据不同的表关系,创建的位置也不一样:
- # - 1 TO 1:建立在任意一方均可,查询频率高的一方最好
- # - 1 TO M:建立在M的一方
- # - M TO M:中间表中建立2个逻辑字段,这样任意一方都可以先反向,再正向拿到另一方
- # - 遵循一个原则,ForeignKey建立在那个表上,那个表上就建立relationship
- # - 有几个ForeignKey,就建立几个relationship
- # 总而言之,使用ORM与原生SQL最直观的区别就是正反向查询能带来更高的代码编写效率,也更加简单
- # 甚至我们可以不用外键约束,只创建这种逻辑字段,让表与表之间的耦合度更低,但是这样要避免脏数据的产生
-
- # 班级负责人,这里是一对一关系,一个班级只有一个负责人
- leader_teacher = relationship(
- # 正向查询时所链接的表,当使用 classesInfo.leader_teacher 时,它将自动指向fk的那一条记录
- "TeachersInfo",
- # 反向查询时所链接的表,当使用 teachersInfo.leader_class 时,它将自动指向该老师所管理的班级
- backref="leader_class",
- )
-
-
- class ClassesAndTeachersRelationship(Base):
- """任教老师与班级的关系表"""
- __tablename__ = "classesAndTeachersRelationship"
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- # 中间表中注意不要设置单列的UNIQUE约束,否则就会变为一对一
- fk_teacher_id = Column(
- Integer,
- ForeignKey(
- "teachersInfo.id",
- ondelete="CASCADE",
- onupdate="CASCADE",
- ),
- nullable=False,
- comment="教师记录"
- )
-
- fk_class_id = Column(
- Integer,
- ForeignKey(
- "classesInfo.id",
- ondelete="CASCADE",
- onupdate="CASCADE",
- ),
- nullable=False,
- comment="班级记录"
- )
- # 多对多关系的中间表必须使用联合唯一约束,防止出现重复数据
- __table_args__ = (
- UniqueConstraint("fk_teacher_id", "fk_class_id"),
- )
-
- # 逻辑字段
- # 给班级用的,查看所有任教老师
- mid_to_teacher = relationship(
- "TeachersInfo",
- backref="mid",
- )
-
- # 给老师用的,查看所有任教班级
- mid_to_class = relationship(
- "ClassesInfo",
- backref="mid"
- )
-
-
- class StudentsInfo(Base):
- """学生信息表"""
- __tablename__ = "studentsInfo"
- id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
- name = Column(String(64), nullable=False, comment="学生姓名")
- gender = Column(Enum("male", "female"), nullable=False, comment="学生性别")
- age = Column(Integer, nullable=False, comment="学生年龄")
- # 外键约束
- # 一对一关系必须为连接表的连接字段创建UNIQUE的约束,这样才能是一对一,否则是一对多
- fk_student_id = Column(
- Integer,
- ForeignKey(
- "studentsNumberInfo.id",
- ondelete="CASCADE",
- onupdate="CASCADE"
- ),
- nullable=False,
- comment="学生编号"
- )
- # 相比于一对一,连接表的连接字段不用UNIQUE约束即为多对一关系
- fk_class_id = Column(
- Integer,
- ForeignKey(
- "classesInfo.id",
- ondelete="CASCADE",
- onupdate="CASCADE"
- ),
- comment="班级编号"
- )
- # 逻辑字段
- # 所在班级, 这里是一对多关系,一个班级中可以有多名学生
- from_class = relationship(
- "ClassesInfo",
- backref="have_student",
- )
- # 学生学号,这里是一对一关系,一个学生只能拥有一个学号
- number_info = relationship(
- "StudentsNumberInfo",
- backref="student_info",
- )
-
-
- if __name__ == "__main__":
- # 删除表
- Base.metadata.drop_all(engine)
- # 创建表
- Base.metadata.create_all(engine)

插入数据:
- # 获取链接池、ORM表对象
- import models
- import datetime
-
-
- models.session.add_all(
- (
- # 插入学号表数据
- models.StudentsNumberInfo(
- number=160201,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- models.StudentsNumberInfo(
- number=160101,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- models.StudentsNumberInfo(
- number=160301,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- models.StudentsNumberInfo(
- number=160102,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- models.StudentsNumberInfo(
- number=160302,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- models.StudentsNumberInfo(
- number=160202,
- admission=datetime.datetime.date(datetime.datetime(2016, 9, 1)),
- graduation=datetime.datetime.date(datetime.datetime(2021, 6, 15))
- ),
- # 插入教师表数据
- models.TeachersInfo(
- number=3341, name="David", gender="male", age=32,
- ),
- models.TeachersInfo(
- number=3342, name="Jason", gender="male", age=30,
- ),
- models.TeachersInfo(
- number=3343, name="Lisa", gender="female", age=28,
- ),
- # 插入班级表数据
- models.ClassesInfo(
- number=1601, name="one year one class", fk_teacher_id=1
- ),
- models.ClassesInfo(
- number=1602, name="one year two class", fk_teacher_id=2
- ),
- models.ClassesInfo(
- number=1603, name="one year three class", fk_teacher_id=3
- ),
- # 插入中间表数据
- models.ClassesAndTeachersRelationship(
- fk_class_id=1, fk_teacher_id=1
- ),
- models.ClassesAndTeachersRelationship(
- fk_class_id=2, fk_teacher_id=1
- ),
- models.ClassesAndTeachersRelationship(
- fk_class_id=3, fk_teacher_id=1
- ),
- models.ClassesAndTeachersRelationship(
- fk_class_id=1, fk_teacher_id=2
- ),
- models.ClassesAndTeachersRelationship(
- fk_class_id=3, fk_teacher_id=3
- ),
- # 插入学生表数据
- models.StudentsInfo(
- name="Jack", gender="male", age=17, fk_student_id=1, fk_class_id=2
- ),
- models.StudentsInfo(
- name="Tom", gender="male", age=18, fk_student_id=2, fk_class_id=1
- ),
- models.StudentsInfo(
- name="Mary", gender="female", age=16, fk_student_id=3,
- fk_class_id=3
- ),
- models.StudentsInfo(
- name="Anna", gender="female", age=17, fk_student_id=4,
- fk_class_id=1
- ),
- models.StudentsInfo(
- name="Bobby", gender="male", age=18, fk_student_id=6, fk_class_id=2
- ),
- )
- )
-
- models.session.commit()
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

INNER JOIN:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.StudentsInfo.name,
- models.StudentsNumberInfo.number,
- models.ClassesInfo.number
- ).join(
- models.StudentsNumberInfo,
- models.StudentsInfo.fk_student_id == models.StudentsNumberInfo.id
- ).join(
- models.ClassesInfo,
- models.StudentsInfo.fk_class_id == models.ClassesInfo.id
- ).all()
-
- print(result)
- # [('Jack', 160201, 1602), ('Tom', 160101, 1601), ('Mary', 160301, 1603), ('Anna', 160102, 1601), ('Bobby', 160202, 1602)]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

LEFT JOIN只需要在每个JOIN中指定isouter关键字参数为True即可:
- session.query(
- 左表.字段,
- 右表.字段
- )
- .join(
- 右表,
- 链接条件,
- isouter=True
- ).all()
RIGHT JOIN需要换表的位置,SQLALchemy本身并未提供RIGHT JOIN,所以使用时一定要注意驱动顺序,小表驱动大表(如果不注意顺序,MySQL优化器内部也会优化):
- session.query(
- 左表.字段,
- 右表.字段
- )
- .join(
- 左表,
- 链接条件,
- isouter=True
- ).all()
将多个查询结果联合起来,必须使用filter(),后面不加all()方法。
因为all()会返回一个列表,而filter()返回的是一个<class 'sqlalchemy.orm.query.Query'>查询对象,此外,必须单拿某一个字段,不能不指定字段直接query():
- # 获取链接池、ORM表对象
- import models
-
- students_name = models.session.query(models.StudentsInfo.name).filter()
- students_number = models.session.query(models.StudentsNumberInfo.number)\
- .filter()
- class_name = models.session.query(models.ClassesInfo.name).filter()
-
- result = students_name.union_all(students_number).union_all(class_name)
-
- print(result.all())
- # [
- # ('Jack',), ('Tom',), ('Mary',), ('Anna',), ('Bobby',),
- # ('160101',), ('160102',), ('160201',), ('160202',), ('160301',), ('160302',),
- # ('one year one class',), ('one year three class',), ('one year two class',)
- # ]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

子查询使用subquery()实现,如下所示,查询每个班级中年龄最小的人:
- # 获取链接池、ORM表对象
- import models
- from sqlalchemy import func
-
- # 子查询中所有字段的访问都需要加上c的前缀
- # 如 sub_query.c.id、 sub_query.c.name等
- sub_query = models.session.query(
- # 使用label()来为字段AS一个别名
- # 后续访问需要通过sub_query.c.alias进行访问
- func.min(models.StudentsInfo.age).label("min_age"),
- models.ClassesInfo.id,
- models.ClassesInfo.name
- ).join(
- models.ClassesInfo,
- models.StudentsInfo.fk_class_id == models.ClassesInfo.id
- ).group_by(
- models.ClassesInfo.id
- ).subquery()
-
-
- result = models.session.query(
- models.StudentsInfo.name,
- sub_query.c.min_age,
- sub_query.c.name
- ).join(
- sub_query,
- sub_query.c.id == models.StudentsInfo.fk_class_id
- ).filter(
- sub_query.c.min_age == models.StudentsInfo.age
- )
-
- print(result.all())
- # [('Jack', 17, 'one year two class'), ('Mary', 16, 'one year three class'), ('Anna', 17, 'one year one class')]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

上面我们都是通过JOIN进行查询的,实际上我们也可以通过逻辑字段relationship进行查询。
下面是正向查询的示例,正向查询是指从有relationship逻辑字段的表开始查询:
- # 查询所有学生的所在班级,我们可以通过学生的from_class字段拿到其所在班级
- # 另外,对于学生来说,班级只能有一个,所以have_student应当是一个对象
-
- # 获取链接池、ORM表对象
- import models
-
- students_lst = models.session.query(
- models.StudentsInfo
- ).all()
-
- for row in students_lst:
- print(f"""
- student name : {row.name}
- from : {row.from_class.name}
- """)
-
- # student name : Mary
- # from : one year three class
-
- # student name : Anna
- # from : one year one class
-
- # student name : Bobby
- # from : one year two class
-
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

下面是反向查询的示例,反向查询是指从没有relationship逻辑字段的表开始查询:
- # 查询所有班级中的所有学生,学生表中有relationship,并且它的backref为have_student,所以我们可以通过班级.have_student来获取所有学生记录
-
- # 另外,对于班级来说,学生可以有多个,所以have_student应当是一个序列
-
- # 获取链接池、ORM表对象
- import models
-
- classes_lst = models.session.query(
- models.ClassesInfo
- ).all()
-
- for row in classes_lst:
- print("class name :", row.name)
- for student in row.have_student:
- print("student name :", student.name)
-
- # class name : one year one class
- # student name : Jack
- # student name : Anna
- # class name : one year two class
- # student name : Tom
- # class name : one year three class
- # student name : Mary
- # student name : Bobby
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

总结,正向查询的逻辑字段总是得到一个对象,反向查询的逻辑字段总是得到一个列表。
使用逻辑字段relationship可以直接对一些跨表记录进行增删改查。
由于逻辑字段是一个类似于列表的存在(仅限于反向查询,正向查询总是得到一个对象),所以列表的绝大多数方法都能用。
- <class 'sqlalchemy.orm.collections.InstrumentedList'>
- - append()
- - clear()
- - copy()
- - count()
- - extend()
- - index()
- - insert()
- - pop()
- - remove()
- - reverse()
- - sort()
下面不再进行实机演示,因为我们上面的几张表中做了很多约束。
- # 比如
- # 给老师增加班级
- result = session.query(Teachers).first()
- # extend方法:
- result.re_class.extend([
- Classes(name="三年级一班",),
- Classes(name="三年级二班",),
- ])
-
- # 比如
- # 减少老师所在的班级
- result = session.query(Teachers).first()
-
- # 待删除的班级对象,集合查找比较快
- delete_class_set = {
- session.query(Classes).filter_by(id=7).first(),
- session.query(Classes).filter_by(id=8).first(),
- }
-
- # 循换老师所在的班级
- # remove方法:
- for class_obj in result.re_class:
- if class_obj in delete_class_set:
- result.re_class.remove(class_obj)
-
- # 比如
- # 清空老师所任教的所有班级
- # 拿出一个老师
- result = session.query(Teachers).first()
- result.re_class.clear()

1)查看每个班级共有多少学生:
JOIN查询:
- # 获取链接池、ORM表对象
- import models
-
- from sqlalchemy import func
-
- result = models.session.query(
- models.ClassesInfo.name,
- func.count(models.StudentsInfo.id)
- ).join(
- models.StudentsInfo,
- models.ClassesInfo.id == models.StudentsInfo.fk_class_id
- ).group_by(
- models.ClassesInfo.id
- ).all()
-
- print(result)
- # [('one year one class', 2), ('one year two class', 2), ('one year three class', 1)]
-
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

正反查询:
- # 获取链接池、ORM表对象
- import models
-
- result = {}
- class_lst = models.session.query(
- models.ClassesInfo
- ).all()
-
- for row in class_lst:
- for student in row.have_student:
- count = result.setdefault(row.name, 0)
- result[row.name] = count + 1
-
- print(result.items())
- # dict_items([('one year one class', 2), ('one year two class', 2), ('one year three class', 1)])
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

2)查看每个学生的入学、毕业年份以及所在的班级名称:
JOIN查询:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.StudentsNumberInfo.number,
- models.StudentsInfo.name,
- models.ClassesInfo.name,
- models.StudentsNumberInfo.admission,
- models.StudentsNumberInfo.graduation
- ).join(
- models.StudentsInfo,
- models.StudentsInfo.fk_class_id == models.ClassesInfo.id
- ).join(
- models.StudentsNumberInfo,
- models.StudentsNumberInfo.id == models.StudentsInfo.fk_student_id
- ).order_by(
- models.StudentsNumberInfo.number.asc()
- ).all()
-
- print(result)
- # [
- # (160101, 'Tom', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160102, 'Anna', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160201, 'Jack', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160202, 'Bobby', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160301, 'Mary', 'one year three class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15))
- # ]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

正反查询:
- # 获取链接池、ORM表对象
- import models
-
- result = []
-
- student_lst = models.session.query(
- models.StudentsInfo
- ).all()
-
- for row in student_lst:
- result.append((
- row.number_info.number,
- row.name,
- row.from_class.name,
- row.number_info.admission,
- row.number_info.graduation
- ))
-
- print(result)
- # [
- # (160101, 'Tom', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160102, 'Anna', 'one year one class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160201, 'Jack', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160202, 'Bobby', 'one year two class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15)),
- # (160301, 'Mary', 'one year three class', datetime.date(2016, 9, 1), datetime.date(2021, 6, 15))
- # ]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

3)查看David所教授的学生中年龄最小的学生:
JOIN查询:
- # 获取链接池、ORM表对象
- import models
-
- result = models.session.query(
- models.TeachersInfo.name,
- models.StudentsInfo.name,
- models.StudentsInfo.age,
- models.ClassesInfo.name
- ).join(
- models.ClassesAndTeachersRelationship,
- models.ClassesAndTeachersRelationship.fk_class_id == models.ClassesInfo.id
- ).join(
- models.TeachersInfo,
- models.ClassesAndTeachersRelationship.fk_teacher_id == models.TeachersInfo.id
- ).join(
- models.StudentsInfo,
- models.StudentsInfo.fk_class_id == models.ClassesInfo.id
- ).filter(
- models.TeachersInfo.name == "David"
- ).order_by(
- models.StudentsInfo.age.asc(),
- models.StudentsInfo.id.asc()
- ).limit(1).all()
-
- print(result)
- # [('David', 'Mary', 16, 'one year three class')]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

正反查询:
- # 获取链接池、ORM表对象
- import models
-
- david = models.session.query(
- models.TeachersInfo
- ).filter(
- models.TeachersInfo.name == "David"
- ).first()
-
- student_lst = []
-
- # 反向查询拿到任教班级,反向是一个列表,所以直接for
- for row in david.mid:
- cls = row.mid_to_class
- # 通过任教班级,反向拿到其下的所有学生
- cls_students = cls.have_student
- # 遍历学生
- for student in cls_students:
- student_lst.append(
- (
- david.name,
- student.name,
- student.age,
- cls.name
- )
- )
-
- # 筛选出年龄最小的
- min_age_student_lst = sorted(
- student_lst, key=lambda tpl: tpl[2])[0]
-
- print(min_age_student_lst)
- # ('David', 'Mary', 16, 'one year three class')
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

4)查看每个班级的负责人是谁,以及任课老师都有谁:
JOIN查询:
- # 获取链接池、ORM表对象
- import models
-
- from sqlalchemy import func
-
- # 先查任课老师
- sub_query = models.session.query(
- models.ClassesAndTeachersRelationship.fk_class_id.label("class_id"),
- func.group_concat(models.TeachersInfo.name).label("have_teachers")
- ).join(
- models.ClassesInfo,
- models.ClassesAndTeachersRelationship.fk_class_id == models.ClassesInfo.id
- ).join(
- models.TeachersInfo,
- models.ClassesAndTeachersRelationship.fk_teacher_id == models.TeachersInfo.id
- ).group_by(
- models.ClassesAndTeachersRelationship.fk_class_id
- ).subquery()
-
- result = models.session.query(
- models.ClassesInfo.name.label("class_name"),
- models.TeachersInfo.name.label("leader_teacher"),
- sub_query.c.have_teachers.label("have_teachers")
- ).join(
- models.TeachersInfo,
- models.ClassesInfo.fk_teacher_id == models.TeachersInfo.id
- ).join(
- sub_query,
- sub_query.c.class_id == models.ClassesInfo.id
- ).all()
-
- print(result)
- # [('one year one class', 'David', 'Jason,David'), ('one year two class', 'Jason', 'David'), ('one year three class', 'Lisa', 'David,Lisa')]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

正反查询:
- # 获取链接池、ORM表对象
- import models
-
- result = []
-
- # 获取所有班级
- classes_lst = models.session.query(
- models.ClassesInfo
- ).all()
-
- for cls in classes_lst:
- cls_message = [
- cls.name,
- cls.leader_teacher.name,
- [],
- ]
- for row in cls.mid:
- cls_message[-1].append(row.mid_to_teacher.name)
- result.append(cls_message)
-
- print(result)
- # [['one year one class', 'David', ['David', 'Jason']], ['one year two class', 'Jason', ['David']], ['one year three class', 'Lisa', ['David', 'Lisa']]]
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close()

如果一条查询语句是filter()结尾,则该对象的__str__方法会返回格式化后的查询语句:
- print(
- models.session.query(models.StudentsInfo).filter()
- )
-
- SELECT `studentsInfo`.id AS `studentsInfo_id`, `studentsInfo`.name AS `studentsInfo_name`, `studentsInfo`.gender AS `studentsInfo_gender`, `studentsInfo`.age AS `studentsInfo_age`, `studentsInfo`.fk_student_id AS `studentsInfo_fk_student_id`, `studentsInfo`.fk_class_id AS `studentsInfo_fk_class_id`
- FROM `studentsInfo`
执行原生命令可使用session.execute()方法执行,它将返回一个cursor游标对象,如下所示:
- # 获取链接池、ORM表对象
- import models
-
- cursor = models.session.execute(
- "SELECT * FROM studentsInfo WHERE id = (:uid)", params={'uid': 1})
-
- print(cursor.fetchall())
-
- # 关闭链接,亦可使用session.remove(),它将回收该链接
- models.session.close() # 获取链接池、ORM表对象
安装:pip install -U Flask-SQLAlchemy
Flask-SQLAlchemy 是基于 SQLAlchemy 开发的,在 Flask 中通过配置 SQLALCHEMY_POOL_SIZE 参数可以设置连接池的大小。连接池的大小决定了同时打开的数据库连接的数量。
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
-
- app = Flask(__name__)
- app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///example.sqlite"
-
-
- class Base(DeclarativeBase):
- pass
-
-
- db = SQLAlchemy(app, model_class=Base)
-
-
- class User(db.Model):
- id: Mapped[int] = mapped_column(db.Integer, primary_key=True)
- username: Mapped[str] = mapped_column(db.String, unique=True, nullable=False)
-
-
- with app.app_context():
- db.create_all()
-
- db.session.add(User(username="example"))
- db.session.commit()
-
- users = db.session.execute(db.select(User)).scalars()

示例:设置连接池的大小为10:app.config['SQLALCHEMY_POOL_SIZE'] = 10
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/db_name'
- db = SQLAlchemy(app)
-
- # 创建模型类
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(50))
-
- # 添加数据到数据库
- user = User(name='John')
- db.session.add(user)
- db.session.commit()
-
- # 查询数据
- all_users = User.query.all()
-
- # 更新数据
- user = User.query.filter_by(name='John').first()
- user.name = 'Jane'
- db.session.commit()
-
- # 删除数据
- user = User.query.filter_by(name='Jane').first()
- db.session.delete(user)
- db.session.commit()

示例:
- import time
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import Sequence
-
- # 创建数据库实例
- app = Flask(__name__)
- # 配置数据库URI
- app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://用户名:密码@127.0.0.1:5432/数据库'
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- # 查询时会显示原始SQL语句
- app.config['SQLALCHEMY_ECHO'] = True
-
- # 数据库对象
- db = SQLAlchemy(app)
-
-
- class VariableList(db.Model):
- __tablename__ = 'variable_list'
-
- # 设置id为自动增长的主键
- variable_id = db.Column(
- db.Integer, Sequence('variable_list_variable_id_seq'),
- autoincrement=True, primary_key=True, nullable=False
- )
- variable_name_en = db.Column(db.String(255), nullable=False, default='')
- variable_name_cn = db.Column(db.String(255), nullable=False, default='')
- variable_value = db.Column(db.Text, nullable=False, default='')
- variable_description = db.Column(db.String(255), nullable=False, default='')
- variable_tag_1 = db.Column(db.String(255), nullable=False, default='')
- variable_tag_2 = db.Column(db.String(255), nullable=False, default='')
- variable_tag_3 = db.Column(db.String(255), nullable=False, default='')
-
-
- with app.app_context():
- from sqlalchemy import text
- db.create_all()
- db.session.execute(text("ALTER SEQUENCE variable_list_variable_id_seq RESTART WITH 2000;"))
- d1 = VariableList()
- d1.variable_name_en = 'test'
- d1.variable_name_cn = '测试'
- db.session.add(d1)
- db.session.commit()

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。