赞
踩
在上一篇文章中 SQLAIchemy 异步DBManager封装-01入门理解 我们深入讨论了SQLAlchemy异步DBManager整体的封装结构与思路。详细地介绍了如何封装添加和批量添加的操作方法,并通过实际示例进行了演示。SQL 全称是结构化查询语言,无疑查询是最复杂的部分。因此,在这篇文章中,我将详细介绍如何封装通用的数据库查询方法,并通过具体的示例来讲解这一过程,使得这一复杂的任务变得更为简单。
class DBManager(metaclass=SingletonMetaCls): DB_CLIENT: SQLAlchemyManager = None orm_table: Type[BaseOrmTable] = None @with_session async def query_by_id( self, pk_id: int, *, orm_table: Type[BaseOrmTable] = None, session: AsyncSession = None, ) -> Union[T_BaseOrmTable, None]: """ 根据主键id查询 Args: pk_id: 主键id orm_table: orm表映射类 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: orm映射类的实例对象 """ orm_table = orm_table or self.orm_table ret = await session.get(orm_table, pk_id) return ret
这个封装很简单,直接看demo吧
class UserTable(BaseOrmTableWithTS): """用户表""" __tablename__ = "user" username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称") age: Mapped[int] = mapped_column(default=0, comment="年龄") password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码") phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号") email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱") avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像") class UserManager(DBManager): orm_table = UserTable async def get_name_by_email(self, email): username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True) return username async def query_demo(): user = await UserManager().query_by_id(pk_id=1) print("user", user) >>> out user {'id': 1, 'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}
@with_session async def _query( self, *, cols: list = None, orm_table: BaseOrmTable = None, conds: list = None, orders: list = None, limit: int = None, offset: int = 0, session: AsyncSession = None, ) -> Result[Any]: """ 通用查询 Args: cols: 查询的列表字段 orm_table: orm表映射类 conds: 查询的条件列表 orders: 排序列表, 默认id升序 limit: 限制数量大小 offset: 偏移量 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 查询结果集 cursor_result """ cols = cols or [] cols = [column(col_obj) if isinstance(col_obj, str) else col_obj for col_obj in cols] # 兼容字符串列表 conditions = conds or [] orders = orders or [column("id")] orm_table = orm_table or self.orm_table # 构造查询 if cols: # 查询指定列 query_sql = select(*cols).select_from(orm_table).where(*conditions).order_by(*orders) else: # 查询全部字段 query_sql = select(orm_table).where(*conditions).order_by(*orders) if limit: query_sql = query_sql.limit(limit).offset(offset) # 执行查询 cursor_result = await session.execute(query_sql) return cursor_result @with_session async def query_one( self, *, cols: list = None, orm_table: Type[BaseOrmTable] = None, conds: list = None, orders: list = None, flat: bool = False, session: AsyncSession = None, ) -> Union[dict, T_BaseOrmTable, Any]: """ 查询单行 Args: cols: 查询的列表字段 orm_table: orm表映射类 conds: 查询的条件列表 orders: 排序列表 flat: 单字段时扁平化处理 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Examples: # 指定列名 ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1]) sql => select username, age from user where id=1 ret => {"username": "hui", "age": 18} # 指定列名,单字段扁平化处理 ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1]) sql => select username from user where id=1 ret => {"username": "hui"} => "hui" # 计算总数 ret = await UserManager().query_one(cols=[func.count()], flat=True) sql => select count(*) as count from user ret => {"count": 10} => 10 # 不指定列名,查询全部字段, 返回表实例对象 ret = await UserManager().query_one(conds=[UserTable.id == 1]) sql => select id, username, age from user where id=1 ret => UserTable(id=1, username="hui", age=18) Returns: Union[dict, BaseOrmTable(), Any(flat=True)] """ cursor_result = await self._query(cols=cols, orm_table=orm_table, conds=conds, orders=orders, session=session) if cols: if flat and len(cols) == 1: # 单行单字段查询: 直接返回字段结果 # eg: select count(*) as count from user 从 {"count": 100} => 100 # eg: select username from user where id=1 从 {"username": "hui"} => "hui" return cursor_result.scalar_one() # eg: select username, age from user where id=1 => {"username": "hui", "age": 18} return cursor_result.mappings().one() or {} else: # 未指定列名查询默认全部字段,返回的是表实例对象 BaseOrmTable() # eg: select id, username, age from user where id=1 => UserTable(id=1, username="hui", age=18) return cursor_result.scalar_one()
查询无疑就只有两种结果单条、多条结果数据。这里统一封装一个 _query 通用查询方法,以供内部使用。
主要封装就是利用 sqlaichemy 提供的 select 语法进行组织sql,通过 column 兼容列名字段字符串列表。query_one 方法,如果指定了 cols 返回字典格式,不指定则是库表映射类实例对象,一开始封装的时候我想统一出参都是返回 库表映射类实例对象 。
query_ret = cursor_result.mappings().one() or {}
return orm_table(**query_ret)
如果是 id as user_id 取别名查询会导致映射不上,但可以查询时不指定别名,orm_table_obj.to_dict(alias_dict={"id": "user_id"})
时进行别名转换,还有一些flat 扁平化、统计数量的时候都不能使用 orm_table(**query_ret)
故而不好统一,再实际web场景中,出参还是要转成dict、json格式化进行响应,故而进行保留。看看具体使用效果
from sqlalchemy import String, func, label async def query_demo(): ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1]) print("指定列名 ret", ret) ret = await UserManager().query_one( cols=[UserTable.username, label("user_age", UserTable.age)], conds=[UserTable.id == 1] ) print("取别名 ret", ret) ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1], flat=True) print("指定列名,单字段扁平化处理", ret) ret = await UserManager().query_one(cols=[func.count()], flat=True) print("计算总数", ret) ret = await UserManager().query_one(conds=[UserTable.id == 1]) print("不指定列名,查询全部字段, 返回表实例对象", ret)
查询结果
指定列名 ret {'username': 'hui', 'age': 18}
取别名 ret {'username': 'hui', 'user_age': 18}
指定列名,单字段扁平化处理 hui
计算总数 6
不指定列名,查询全部字段, 返回表实例对象 {'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'id': 1, 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}
@with_session async def query_all( self, *, cols: list = None, orm_table: BaseOrmTable = None, conds: list = None, orders: list = None, flat: bool = False, limit: int = None, offset: int = None, session: AsyncSession = None, ) -> Union[List[dict], List[T_BaseOrmTable], Any]: """ 查询多行 Args: cols: 查询的列表字段 orm_table: orm表映射类 conds: 查询的条件列表 orders: 排序列表 flat: 单字段时扁平化处理 limit: 限制数量大小 offset: 偏移量 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 """ cursor_result = await self._query( cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session ) if cols: if flat and len(cols) == 1: # 扁平化处理 # eg: select id from user 从 [{"id": 1}, {"id": 2}, {"id": 3}] => [1, 2, 3] return cursor_result.scalars().all() # eg: select username, age from user => [{"username": "hui", "age": 18}, [{"username": "dbk", "age": 18}]] return cursor_result.mappings().all() or [] else: # 未指定列名查询默认全部字段,返回的是表实例对象 [BaseOrmTable()] # eg: select id, username, age from user # [User(id=1, username="hui", age=18), User(id=2, username="dbk", age=18) return cursor_result.scalars().all()
查询多条与query_one一致内部调用 _query() 获取查询结果集,最后通过 cursor_result.mappings().all()
、cursor_result.scalars().all()
获取列表数据,同样支持单字段扁平化处理,还支持分页处理。具体看如下例子
from sqlalchemy import String, func, label, or_ ret = await UserManager().query_all() user_ids = [user.id for user in ret] print("查询全部", user_ids) user_ids = await UserManager().query_all(cols=[UserTable.id], flat=True) print("查询全部的用户id(扁平化处理)", user_ids) ret = await UserManager().query_all( cols=[UserTable.username], conds=[ UserTable.id > 1, or_(UserTable.age < 20, UserTable.email == "huidbk.163.com") ], orders=[UserTable.id], flat=True ) # sql => select username from user where user.id > 1 and (age < 20 or email='huidbk.163.com') order by id print("条件查询", ret)
查询结果
查询全部 [1, 2, 3, 4, 5, 6]
查询全部的用户id(扁平化处理) [1, 2, 3, 4, 5, 6]
条件查询 ['zack']
单字段扁平化处理,可以节省获取查询数据后再进行扁平化处理的一步操作。看看下面没有扁平化处理
user_infos = await UserManager().query_all(cols=[UserTable.id])
user_ids = [user_info.get("id") for user_info in user_infos]
print("查询全部的用户id", user_ids)
ret = await UserManager().query_one(cols=[func.count()])
count = ret.get("count") or 0
print("计算总数", count)
上面的获取某业务的所有id,计算总数等,直接获取扁平化的结果,有时还是比较实用。
async def list_page( self, *, cols: list = None, orm_table: BaseOrmTable = None, conds: list = None, orders: list = None, curr_page: int = 1, page_size: int = 20, session: AsyncSession = None, ): """ 单表通用分页查询 Args: cols: 查询的列表字段 orm_table: orm表映射类 conds: 查询的条件列表 orders: 排序列表 curr_page: 页码 page_size: 每页数量 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: total_count, data_list """ conds = conds or [] orders = orders or [column("id")] orm_table = orm_table or self.orm_table limit = page_size offset = (curr_page - 1) * page_size total_count, data_list = await asyncio.gather( self.query_one( cols=[func.count()], orm_table=orm_table, conds=conds, orders=orders, flat=True, session=session ), self.query_all( cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session ), ) return total_count, data_list
这里分页查询就用 query_one 查询总数,query_all 分页查询,curr_page 当前页与 page_size 每页大小计算数据偏移量 offset,然后通过 asyncio.gather 并发执行获取结果。
total_count, data_list = await UserManager().list_page(
cols=[UserTable.id, UserTable.username, UserTable.age],
conds=[UserTable.id > 1],
curr_page=2,
page_size=3,
orders=[desc(UserTable.age)]
)
print("分页查询 total_count", total_count)
print("分页查询 data_list", data_list)
分页查询结果
分页查询 total_count 5
分页查询 data_list [{'id': 3, 'username': 'wang', 'age': 20}, {'id': 2, 'username': 'zack', 'age': 19}]
这里的分页查询没有使用 with_session 装饰器,由于 asyncio.gather 并发操作原因不能共享数据库会话 session,需要单独的 session,不然会报如下错误。
sqlalchemy.exc.InvalidRequestError:无法在上下文管理器内的已关闭事务上进行操作。 请先完成上下文管理器,然后再发出进一步的命令。
SQL 的话还是查询用的多,查询也复杂,这里的话只封装了一些通用的查询操作,有一些分组查询、连表查询等我都没有封装,我认为这些操作还是写原生sql更直观一些,用ORM进行组装这些操作会感觉语法很别扭不简洁。如何执行原始sql,请看下一篇。SQLAIchemy 异步DBManager封装-03得心应手
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。