赞
踩
在前两篇文章中,我们详细介绍了SQLAlchemy异步DBManager的封装过程。第一篇文章帮助我们入门理解了整体的封装结构和思路,第二篇文章则帮助我们更加熟悉和掌握了这个封装的使用。我们已经介绍了添加和查询操作,并且对整体的封装思路有了深入的了解。
在本文中,我将继续扩展封装,介绍如何进行更新和删除操作。同时,我将演示如何执行原生的SQL语句,并介绍在异常情况下如何进行事务回滚的场景。这些内容将帮助我们更全面地应对各种数据库操作的需求。
from sqlalchemy import Result, column, delete, func, select, text, update @with_session async def update( self, values: dict, *, orm_table: Type[BaseOrmTable] = None, conds: list = None, session: AsyncSession = None, ): """ 更新数据 Args: values: 要更新的字段和对应的值,字典格式,例如 {"field1": value1, "field2": value2, ...} orm_table: ORM表映射类 conds: 更新条件列表,每个条件为一个表达式,例如 [UserTable.username == "hui", ...] session: 数据库会话对象,如果为 None,则在方法内部开启新的事务 Returns: 影响的行数 cursor_result.rowcount """ orm_table = orm_table or self.orm_table conds = conds or [] values = values or {} if not values: return sql = update(orm_table).where(*conds).values(**values) cursor_result = await session.execute(sql) return cursor_result.rowcount @with_session async def update_or_add( self, table_obj: [T_BaseOrmTable, dict], *, orm_table: Type[BaseOrmTable] = None, session: AsyncSession = None, **kwargs, ): """ 指定对象更新or添加数据 Args: table_obj: 映射类实例对象 or dict, e.g. UserTable(username="hui", age=18) or {"username": "hui", "v": 18, ...} orm_table: ORM表映射类 session: 数据库会话对象,如果为 None,则在方法内部开启新的事务 Returns: """ orm_table = orm_table or self.orm_table if isinstance(table_obj, dict): table_obj = orm_table(**table_obj) return await session.merge(table_obj, **kwargs)
class UserFileTable(BaseOrmTable): """用户文件表""" __tablename__ = "user_file" filename: Mapped[str] = mapped_column(String(100), default="", comment="文件名称") creator: Mapped[int] = mapped_column(default=0, comment="文件创建者") file_suffix: Mapped[str] = mapped_column(String(100), default="", comment="文件后缀") file_size: Mapped[int] = mapped_column(default=0, comment="文件大小") oss_key: Mapped[str] = mapped_column(String(100), default="", comment="oss key(minio)") is_del: Mapped[int] = mapped_column(default=0, comment="是否删除") deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间") class UserFileManager(DBManager): orm_table = UserFileTable async def update_demo(): ret = await UserFileManager().update(values={"filename": "hui"}, conds=[UserFileTable.id == 1]) print("update ret", ret) # 添加 user_file_info = {"filename": "huidbk", "oss_key": uuid.uuid4().hex} user_file: UserFileTable = await UserFileManager().update_or_add(table_obj=user_file_info) print("update_or_add add", user_file) # 更新 user_file.file_suffix = "png" user_file.file_size = 100 user_file.filename = "hui-update_or_add" ret: UserFileTable = await UserFileManager().update_or_add(table_obj=user_file) print("update_or_add update", ret)
@with_session async def bulk_delete_by_ids( self, pk_ids: list, *, orm_table: Type[BaseOrmTable] = None, logic_del: bool = False, logic_field: str = "deleted_at", logic_del_set_value: Any = None, session: AsyncSession = None, ): """ 根据主键id批量删除 Args: pk_ids: 主键id列表 orm_table: orm表映射类 logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除 logic_field: 逻辑删除字段 默认 deleted_at logic_del_set_value: 逻辑删除字段设置的值 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 删除的记录数 """ orm_table = orm_table or self.orm_table conds = [orm_table.id.in_(pk_ids)] return await self.delete( conds=conds, orm_table=orm_table, logic_del=logic_del, logic_field=logic_field, logic_del_set_value=logic_del_set_value, session=session, ) @with_session async def delete_by_id( self, pk_id: int, *, orm_table: Type[BaseOrmTable] = None, logic_del: bool = False, logic_field: str = "deleted_at", logic_del_set_value: Any = None, session: AsyncSession = None, ): """ 根据主键id删除 Args: pk_id: 主键id orm_table: orm表映射类 logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除 logic_field: 逻辑删除字段 默认 deleted_at logic_del_set_value: 逻辑删除字段设置的值 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 删除的记录数 """ orm_table = orm_table or self.orm_table conds = [orm_table.id == pk_id] return await self.delete( conds=conds, orm_table=orm_table, logic_del=logic_del, logic_field=logic_field, logic_del_set_value=logic_del_set_value, session=session, ) @with_session async def delete( self, *, conds: list = None, orm_table: Type[BaseOrmTable] = None, logic_del: bool = False, logic_field: str = "deleted_at", logic_del_set_value: Any = None, session: AsyncSession = None, ): """ 通用删除 Args: conds: 条件列表, e.g. [UserTable.id == 1] orm_table: orm表映射类 logic_del: 逻辑删除,默认 False 物理删除 True 逻辑删除 logic_field: 逻辑删除字段 默认 deleted_at logic_del_set_value: 逻辑删除字段设置的值 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 删除的记录数 """ orm_table = orm_table or self.orm_table if logic_del: # 执行逻辑删除操作 logic_del_info = dict() logic_del_info[logic_field] = logic_del_set_value or datetime.now() delete_stmt = update(orm_table).where(*conds).values(**logic_del_info) else: # 执行物理删除操作 delete_stmt = delete(orm_table).where(*conds) cursor_result = await session.execute(delete_stmt) # 返回影响的记录数 return cursor_result.rowcount
conds = [orm_table.id == pk_id]
,调用 delete 方法conds = [orm_table.id.in_(pk_ids)]
调用 delete 方法这两种删除操作都是通过调用 delete 方法实现的。默认情况下,这些操作执行的是物理删除。对于一些重要的数据,我们也可以选择执行逻辑删除。在逻辑删除中,默认使用 deleted_at
字段来记录删除时间。我们也可以指定具体的逻辑删除字段 logic_field
,以及逻辑字段的赋值情况 logic_del_set_value
,然后进行一个更新操作来实现逻辑删除。
如下是删除前的数据
async def delete_demo(): file_count = await UserFileManager().query_one(cols=[func.count()], flat=True) print("file_count", file_count) ret = await UserFileManager().delete_by_id(file_count) print("delete_by_id ret", ret) ret = await UserFileManager().bulk_delete_by_ids(pk_ids=[10, 11, 12]) print("bulk_delete_by_ids ret", ret) ret = await UserFileManager().delete(conds=[UserFileTable.id == 13]) print("delete ret", ret) ret = await UserFileManager().delete(conds=[UserFileTable.id == 5], logic_del=True) print("logic_del ret", ret) ret = await UserFileManager().delete( conds=[UserFileTable.id == 6], logic_del=True, logic_field="is_del", logic_del_set_value=1 ) print("logic_del set logic_field ret", ret)
删除结果展示
file_count 20
delete_by_id ret 0
bulk_delete_by_ids ret 3
delete ret 1
logic_del ret 1
logic_del set logic_field ret 1
主键id 为5、6的被逻辑删除了,10,11,12,13 被物理删除了。
@with_session async def run_sql( self, sql: str, *, params: dict = None, query_one: bool = False, session: AsyncSession = None ) -> Union[dict, List[dict]]: """ 执行并提交单条sql Args: sql: sql语句 params: sql参数, eg. {":id_val": 10, ":name_val": "hui"} query_one: 是否查询单条,默认False查询多条 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 执行sql的结果 """ sql = text(sql) cursor_result = await session.execute(sql, params) if query_one: return cursor_result.mappings().one() or {} else: return cursor_result.mappings().all() or []
内部执行sql时需要通过 sqlaichemy 的 text 函数转一下,然后根据 query_one 的值来确定查询单条还是多条。
async def run_raw_sql_demo(): """运行原生sql demo""" count_sql = "select count(*) as total_count from user_file" count_ret = await UserFileManager().run_sql(count_sql, query_one=True) print("count_ret", count_ret) data_sql = "select * from user_file where id > :id_val and file_size >= :file_size_val" params = {"id_val": 20, "file_size_val": 0} data_ret = await UserFileManager().run_sql(data_sql, params=params) print("dict data_ret", data_ret) data_sql = "select * from user_file where id > :id_val" data_ret = await UserFileManager().run_sql(sql=data_sql, params={"id_val": 4}) print("dict data_ret", data_ret) # 连表查询 data_sql = """ select user.id as user_id, username, user_file.id as file_id, filename, oss_key from user_file join user on user.id = user_file.creator where user_file.creator = :user_id """ data_ret = await UserFileManager().run_sql(data_sql, params={"user_id": 1}) print("join sql data_ret", data_ret)
需要注意的执行原生sql,sql参数的展位符是 :param_name 冒号后面接参数名称,然后参数对应的值则是字典形式组织。
查询结果如下
count_ret {'total_count': 16}
dict data_ret [{'id': 62, 'filename': 'aaa', 'creator': 0, 'file_suffix': '', 'file_size': 0, 'oss_key': '6dd01a72599e467eb3fcdd9b47e1de9c', 'is_del': 0, 'deleted_at': None}, ..]
dict data_ret [{'id': 5, 'filename': 'eee', 'creator': 0, 'file_suffix': '', 'file_size': 0, 'oss_key': '6892400cc83845aca89b2ebafc675471', 'is_del': 0, 'deleted_at': datetime.datetime(2024, 4, 16, 23, 56, 49)}, ...]
join sql data_ret [{'user_id': 1, 'username': 'hui', 'file_id': 1, 'filename': 'hui', 'oss_key': 'bbb'}]
async def create_and_transaction_demo(): async with UserFileManager.transaction() as session: await UserFileManager().bulk_add( table_objs=[{"filename": "aaa", "oss_key": uuid.uuid4().hex}], session=session ) user_file_obj = UserFileTable(filename="eee", oss_key=uuid.uuid4().hex) file_id = await UserFileManager().add(table_obj=user_file_obj, session=session) print("file_id", file_id) ret: UserFileTable = await UserFileManager().query_by_id(2, session=session) print("query_by_id", ret) # 异常回滚 a = 1 / 0 ret = await UserFileManager().query_one( cols=[UserFileTable.filename, UserFileTable.oss_key], conds=[UserFileTable.filename == "ccc"], session=session ) print("ret", ret)
这里通过 transaction() 获取事务会话 session,让后面的数据库操作都指定 session 参数,with_session 装饰器就不会再次构造,实现了共用一个 session,事务内的操作要么都成功要么都失败。
SQLAIchemyManager 设计
BaseOrmTable、TimestampColumns、BaseOrmTableWithTS 设计
transaction 上下文管理器(事务会话)
with_session 装饰器
orm_table 设计
DBManager 设计
查询扁平化 flat
字典与库表映射类实例
分页查询
逻辑删除
deleted_at
字段 or 指定逻辑字段进行逻辑删除,保留重要数据执行原生sql
到这就结束了,希望这些封装,可以满足各种复杂业务场景下的需求,提高数据库操作的灵活性和适用性,从而提高我们的开发效率。让代码变得更简单。
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。