当前位置:   article > 正文

SQLAIchemy 异步DBManager封装-02熟悉掌握

SQLAIchemy 异步DBManager封装-02熟悉掌握

一、引言

在上一篇文章中 SQLAIchemy 异步DBManager封装-01入门理解 我们深入讨论了SQLAlchemy异步DBManager整体的封装结构与思路。详细地介绍了如何封装添加和批量添加的操作方法,并通过实际示例进行了演示。SQL 全称是结构化查询语言,无疑查询是最复杂的部分。因此,在这篇文章中,我将详细介绍如何封装通用的数据库查询方法,并通过具体的示例来讲解这一过程,使得这一复杂的任务变得更为简单。

二、通用查询封装

指定主键id查询

class DBManager(metaclass=SingletonMetaCls):
    DB_CLIENT: SQLAlchemyManager = None
    orm_table: Type[BaseOrmTable] = None
    
    @with_session
    async def query_by_id(
            self,
            pk_id: int,
            *,
            orm_table: Type[BaseOrmTable] = None,
            session: AsyncSession = None,
    ) -> Union[T_BaseOrmTable, None]:
        """
        根据主键id查询
        Args:
            pk_id: 主键id
            orm_table: orm表映射类
            session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

        Returns:
            orm映射类的实例对象
        """
        orm_table = orm_table or self.orm_table
        ret = await session.get(orm_table, pk_id)
        return ret
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这个封装很简单,直接看demo吧

class UserTable(BaseOrmTableWithTS):
    """用户表"""

    __tablename__ = "user"
    username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")
    age: Mapped[int] = mapped_column(default=0, comment="年龄")
    password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")
    phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")
    email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")
    avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")



class UserManager(DBManager):
    orm_table = UserTable

    async def get_name_by_email(self, email):
        username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True)
        return username
        
async def query_demo():
    user = await UserManager().query_by_id(pk_id=1)
    print("user", user)

>>> out
user {'id': 1, 'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

查询单条

@with_session
async def _query(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        limit: int = None,
        offset: int = 0,
        session: AsyncSession = None,
) -> Result[Any]:
    """
    通用查询
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表, 默认id升序
        limit: 限制数量大小
        offset: 偏移量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 查询结果集
        cursor_result
    """
    cols = cols or []
    cols = [column(col_obj) if isinstance(col_obj, str) else col_obj for col_obj in cols]  # 兼容字符串列表

    conditions = conds or []
    orders = orders or [column("id")]
    orm_table = orm_table or self.orm_table

    # 构造查询
    if cols:
        # 查询指定列
        query_sql = select(*cols).select_from(orm_table).where(*conditions).order_by(*orders)
    else:
        # 查询全部字段
        query_sql = select(orm_table).where(*conditions).order_by(*orders)

    if limit:
        query_sql = query_sql.limit(limit).offset(offset)

    # 执行查询
    cursor_result = await session.execute(query_sql)
    return cursor_result

@with_session
async def query_one(
        self,
        *,
        cols: list = None,
        orm_table: Type[BaseOrmTable] = None,
        conds: list = None,
        orders: list = None,
        flat: bool = False,
        session: AsyncSession = None,
) -> Union[dict, T_BaseOrmTable, Any]:
    """
    查询单行
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        flat: 单字段时扁平化处理
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Examples:
        # 指定列名
        ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1])
        sql => select username, age from user where id=1
        ret => {"username": "hui", "age": 18}

        # 指定列名,单字段扁平化处理
        ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1])
        sql => select username from user where id=1
        ret => {"username": "hui"} => "hui"

        # 计算总数
        ret = await UserManager().query_one(cols=[func.count()], flat=True)
        sql => select count(*) as count from user
        ret => {"count": 10} => 10

        # 不指定列名,查询全部字段, 返回表实例对象
        ret = await UserManager().query_one(conds=[UserTable.id == 1])
        sql => select id, username, age from user where id=1
        ret => UserTable(id=1, username="hui", age=18)

    Returns:
        Union[dict, BaseOrmTable(), Any(flat=True)]
    """
    cursor_result = await self._query(cols=cols, orm_table=orm_table, conds=conds, orders=orders, session=session)
    if cols:
        if flat and len(cols) == 1:
            # 单行单字段查询: 直接返回字段结果
            # eg: select count(*) as count from user 从 {"count": 100} => 100
            # eg: select username from user where id=1 从 {"username": "hui"} => "hui"
            return cursor_result.scalar_one()

        # eg: select username, age from user where id=1 => {"username": "hui", "age": 18}
        return cursor_result.mappings().one() or {}
    else:
        # 未指定列名查询默认全部字段,返回的是表实例对象 BaseOrmTable()
        # eg: select id, username, age from user where id=1 => UserTable(id=1, username="hui", age=18)
        return cursor_result.scalar_one()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

查询无疑就只有两种结果单条、多条结果数据。这里统一封装一个 _query 通用查询方法,以供内部使用。

  • 支持指定查询的列(cols)
  • 条件查询(conds)
  • 排序(orders)
  • 分页(limit、offset)

主要封装就是利用 sqlaichemy 提供的 select 语法进行组织sql,通过 column 兼容列名字段字符串列表。query_one 方法,如果指定了 cols 返回字典格式,不指定则是库表映射类实例对象,一开始封装的时候我想统一出参都是返回 库表映射类实例对象 。

query_ret = cursor_result.mappings().one() or {}
return orm_table(**query_ret)
  • 1
  • 2

如果是 id as user_id 取别名查询会导致映射不上,但可以查询时不指定别名,orm_table_obj.to_dict(alias_dict={"id": "user_id"}) 时进行别名转换,还有一些flat 扁平化、统计数量的时候都不能使用 orm_table(**query_ret) 故而不好统一,再实际web场景中,出参还是要转成dict、json格式化进行响应,故而进行保留。看看具体使用效果

from sqlalchemy import String, func, label

async def query_demo():
    ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1])
    print("指定列名 ret", ret)
    
    ret = await UserManager().query_one(
        cols=[UserTable.username, label("user_age", UserTable.age)], conds=[UserTable.id == 1]
    )
    print("取别名 ret", ret)

    ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1], flat=True)
    print("指定列名,单字段扁平化处理", ret)

    ret = await UserManager().query_one(cols=[func.count()], flat=True)
    print("计算总数", ret)

    ret = await UserManager().query_one(conds=[UserTable.id == 1])
    print("不指定列名,查询全部字段, 返回表实例对象", ret)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

查询结果

指定列名 ret {'username': 'hui', 'age': 18}

取别名 ret {'username': 'hui', 'user_age': 18}

指定列名,单字段扁平化处理 hui

计算总数 6

不指定列名,查询全部字段, 返回表实例对象 {'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'id': 1, 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

查询多条

@with_session
async def query_all(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        flat: bool = False,
        limit: int = None,
        offset: int = None,
        session: AsyncSession = None,
) -> Union[List[dict], List[T_BaseOrmTable], Any]:
    """
    查询多行
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        flat: 单字段时扁平化处理
        limit: 限制数量大小
        offset: 偏移量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务
    """
    cursor_result = await self._query(
        cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session
    )
    if cols:
        if flat and len(cols) == 1:
            # 扁平化处理
            # eg: select id from user 从 [{"id": 1}, {"id": 2}, {"id": 3}] => [1, 2, 3]
            return cursor_result.scalars().all()

        # eg: select username, age from user => [{"username": "hui", "age": 18}, [{"username": "dbk", "age": 18}]]
        return cursor_result.mappings().all() or []
    else:
        # 未指定列名查询默认全部字段,返回的是表实例对象 [BaseOrmTable()]
        # eg: select id, username, age from user
        # [User(id=1, username="hui", age=18), User(id=2, username="dbk", age=18)
        return cursor_result.scalars().all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

查询多条与query_one一致内部调用 _query() 获取查询结果集,最后通过 cursor_result.mappings().all()cursor_result.scalars().all() 获取列表数据,同样支持单字段扁平化处理,还支持分页处理。具体看如下例子

from sqlalchemy import String, func, label, or_

ret = await UserManager().query_all()
user_ids = [user.id for user in ret]
print("查询全部", user_ids)

user_ids = await UserManager().query_all(cols=[UserTable.id], flat=True)
print("查询全部的用户id(扁平化处理)", user_ids)

ret = await UserManager().query_all(
    cols=[UserTable.username],
    conds=[
        UserTable.id > 1,
        or_(UserTable.age < 20, UserTable.email == "huidbk.163.com")
    ],
    orders=[UserTable.id],
    flat=True
)
# sql => select username from user where user.id > 1 and (age < 20 or email='huidbk.163.com') order by id
print("条件查询", ret)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

查询结果

查询全部 [1, 2, 3, 4, 5, 6]
查询全部的用户id(扁平化处理) [1, 2, 3, 4, 5, 6]
条件查询 ['zack']
  • 1
  • 2
  • 3

单字段扁平化处理,可以节省获取查询数据后再进行扁平化处理的一步操作。看看下面没有扁平化处理

user_infos = await UserManager().query_all(cols=[UserTable.id])
user_ids = [user_info.get("id") for user_info in user_infos]
print("查询全部的用户id", user_ids)

ret = await UserManager().query_one(cols=[func.count()])
count = ret.get("count") or 0
print("计算总数", count)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

上面的获取某业务的所有id,计算总数等,直接获取扁平化的结果,有时还是比较实用。

分页查询

async def list_page(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        curr_page: int = 1,
        page_size: int = 20,
        session: AsyncSession = None,
):
    """
    单表通用分页查询
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        curr_page: 页码
        page_size: 每页数量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 
        total_count, data_list
    """
    conds = conds or []
    orders = orders or [column("id")]
    orm_table = orm_table or self.orm_table

    limit = page_size
    offset = (curr_page - 1) * page_size
    total_count, data_list = await asyncio.gather(
        self.query_one(
            cols=[func.count()], orm_table=orm_table, conds=conds, orders=orders, flat=True, session=session
        ),
        self.query_all(
            cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session
        ),
    )

    return total_count, data_list
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

这里分页查询就用 query_one 查询总数,query_all 分页查询,curr_page 当前页与 page_size 每页大小计算数据偏移量 offset,然后通过 asyncio.gather 并发执行获取结果。

total_count, data_list = await UserManager().list_page(
    cols=[UserTable.id, UserTable.username, UserTable.age],
    conds=[UserTable.id > 1],
    curr_page=2,
    page_size=3,
    orders=[desc(UserTable.age)]
)
print("分页查询 total_count", total_count)
print("分页查询 data_list", data_list)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

分页查询结果

分页查询 total_count 5
分页查询 data_list [{'id': 3, 'username': 'wang', 'age': 20}, {'id': 2, 'username': 'zack', 'age': 19}]
  • 1
  • 2

这里的分页查询没有使用 with_session 装饰器,由于 asyncio.gather 并发操作原因不能共享数据库会话 session,需要单独的 session,不然会报如下错误。

sqlalchemy.exc.InvalidRequestError:无法在上下文管理器内的已关闭事务上进行操作。 请先完成上下文管理器,然后再发出进一步的命令。

三、封装说明

SQL 的话还是查询用的多,查询也复杂,这里的话只封装了一些通用的查询操作,有一些分组查询、连表查询等我都没有封装,我认为这些操作还是写原生sql更直观一些,用ORM进行组装这些操作会感觉语法很别扭不简洁。如何执行原始sql,请看下一篇。SQLAIchemy 异步DBManager封装-03得心应手

四、Github源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/467294
推荐阅读
相关标签
  

闽ICP备14008679号