赞
踩
SQLAlchemy 是一个强大的 Python SQL 工具包和对象关系映射(ORM)系统,是业内比较流行的ORM,设计非常优雅。随着其2.0版本的发布,SQLAlchemy 引入了原生的异步支持,这极大地增强了其在处理高并发和异步I/O场景下的能力。通过结合像greenlet、gevent这样的协程库,SQLAlchemy 使得异步数据库操作成为可能,从而提高了应用程序的性能和响应速度。
这里我将基于SQLAlchemy的异步支持,封装一些常用的增删改查(CRUD)操作到 https://github.com/HuiDBK/py-tools 中,以便在项目开发中更加便捷地使用。
Github: https://github.com/sqlalchemy/sqlalchemy
2.0文档:https://docs.sqlalchemy.org/en/20/index.html
封装前,先简单介绍下如何使用 SQLAIchemy。
具体细节可以参考官网文档:https://docs.sqlalchemy.org/en/20/orm/quickstart.html
pip install sqlalchemy[asyncio]==2.0.20
pip install aiomysql==0.2.0
这里安装了 sqlalchemy 2.0版本,以及 aiomysql 异步数据库驱动,进行演示。
from sqlalchemy.ext.asyncio import create_async_engine
# db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}"
db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/demo")
from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class BaseOrmTable(DeclarativeBase): """SQLAlchemy Base ORM Model""" __abstract__ = True id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="主键ID") class UserTable(BaseOrmTable): """用户表""" __tablename__ = "user" username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称") password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码") phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号") email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱") avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column # db_uri = "{protocol}://{user}:{password}@{host}:{port}/{db}" db_engine = create_async_engine("mysql+aiomysql://root:123456@127.0.0.1:3306/hui-demo") Session = async_sessionmaker(db_engine) async def create_tables(): # 根据映射创建库表 async with db_engine.begin() as conn: await conn.run_sync(BaseOrmTable.metadata.create_all) async def main(): await create_tables() async with Session.begin() as session: # 添加用户 new_user = UserTable(username='hui', email='huidbk@163.com') session.add(new_user) await session.flush() # 刷新table 对象属性,获取新增的id print(new_user.id) print("add user", new_user.__dict__) # 获取用户 user = await session.get(UserTable, new_user.id) print("get user", user.__dict__) # 更新用户 user.email = 'hui@163.com' await session.merge(user) print("updated user", user.__dict__) # 删除用户 await session.delete(user) if __name__ == '__main__': # 运行主函数 asyncio.run(main())
class SQLAlchemyManager(metaclass=SingletonMetaCls): DB_URL_TEMPLATE = "{protocol}://{user}:{password}@{host}:{port}/{db}" def __init__( self, host: str = "localhost", port: int = 3306, user: str = "", password: str = "", db_name: str = "", pool_size: int = 30, pool_pre_ping: bool = True, pool_recycle: int = 600, log: Union[logging.Logger] = None, ): self.host = host self.port = port self.user = user self.password = password self.db_name = db_name self.pool_size = pool_size self.pool_pre_ping = pool_pre_ping self.pool_recycle = pool_recycle self.log = log or logger self.db_engine: AsyncEngine = None self.async_session_maker: async_sessionmaker = None def get_db_url(self, protocol: str = "mysql+aiomysql"): db_url = self.DB_URL_TEMPLATE.format( protocol=protocol, user=self.user, password=self.password, host=self.host, port=self.port, db=self.db_name ) return db_url def init_db_engine(self, protocol: str): """ 初始化db引擎 Args: protocol: 驱动协议类型 Returns: self.db_engine """ db_url = self.get_db_url(protocol=protocol) self.log.info(f"init_db_engine => {db_url}") self.db_engine = create_async_engine( url=db_url, pool_size=self.pool_size, pool_pre_ping=self.pool_pre_ping, pool_recycle=self.pool_recycle ) self.async_session_maker = async_sessionmaker(bind=self.db_engine, expire_on_commit=False) return self.db_engine def init_mysql_engine(self, protocol: str = "mysql+aiomysql"): """ 初始化mysql引擎 Args: protocol: 驱动协议类型 Returns: self.db_engine """ return self.init_db_engine(protocol=protocol)
SQLAlchemyManager 主要封装一些数据库账户配置信息、连接池信息。
pool_size(连接池大小): 指定连接池中允许保持的最大连接数。当应用程序需要访问数据库时,连接池会维护一定数量的数据库连接,以便快速地响应请求。通常情况下,pool_size 的值应该根据应用程序的并发访问量和数据库的性能来进行调整。
pool_pre_ping(预检查连接): 指定是否在数据库连接被使用前对连接进行预检查。预检查可以确保连接处于活动状态,并且可以自动重新连接到数据库服务器,以防止连接由于长时间空闲而失效。启用预检查可以提高应用程序对数据库的可靠性和稳定性。
pool_recycle(连接回收时间): 指定数据库连接在被重新使用之前的最大空闲时间。当连接空闲时间超过 pool_recycle 设置的值时,连接将被关闭并重新创建,以防止连接长时间处于空闲状态而导致的连接问题。pool_recycle 的值通常设置为一个较小的时间间隔,以确保连接能够及时地得到回收和重建,从而提高连接的健壮性和性能。
init_db_engine
方法则是初始化数据库引擎,内部根据数据库配置信息
from datetime import datetime from sqlalchemy import func from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class BaseOrmTable(AsyncAttrs, DeclarativeBase): """SQLAlchemy Base ORM Model""" __abstract__ = True id: Mapped[int] = mapped_column(primary_key=True, comment="主键ID") def __repr__(self): return str(self.to_dict()) def to_dict(self, alias_dict: dict = None, exclude_none=True) -> dict: """ 数据库模型转成字典 Args: alias_dict: 字段别名字典 eg: {"id": "user_id"}, 把id名称替换成 user_id exclude_none: 默认排查None值 Returns: dict """ alias_dict = alias_dict or {} if exclude_none: return { alias_dict.get(c.name, c.name): getattr(self, c.name) for c in self.__table__.columns if getattr(self, c.name) is not None } else: return { alias_dict.get(c.name, c.name): getattr(self, c.name, None) for c in self.__table__.columns } class TimestampColumns(AsyncAttrs, DeclarativeBase): """时间戳相关列""" __abstract__ = True created_at: Mapped[datetime] = mapped_column(default=datetime.now, comment="创建时间") updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now, comment="更新时间") deleted_at: Mapped[datetime] = mapped_column(nullable=True, comment="删除时间") class BaseOrmTableWithTS(BaseOrmTable, TimestampColumns): __abstract__ = True
创建一些基础的 ORM 类,以便后续的映射类可以继承并且共享一些公有属性和方法。
BaseOrmTable
类:
AsyncAttrs
和 DeclarativeBase
。这样做使得 BaseOrmTable
类具有了异步属性访问的能力,为异步编程提供便利,特别是在异步环境中访问具有延迟加载或者异步加载特性的属性。to_dict
方法,用于将数据库模型转换为字典。它支持通过参数 alias_dict
指定字段别名,并且可以选择是否排除值为 None 的属性。TimestampColumns
类:
created_at
和 updated_at
默认使用 datetime.now
函数来自动记录当前时间,deleted_at
则允许为空,用于标记数据的删除时间(可用作于逻辑删除)BaseOrmTableWithTS
类:
BaseOrmTable
和 TimestampColumns
,实际上是一个组合类,集成了基础的 ORM 功能和时间戳相关的列。BaseOrmTable
和 TimestampColumns
,使得后续的映射类只需要继承这个类,就能够拥有基础的 ORM 功能和时间戳相关的列。通过这种封装,你可以在后续的数据库映射类中更加专注于业务逻辑的实现,而不需要重复编写基础的 ORM 功能和时间戳相关的列,提高了代码的重用性和可维护性。
from typing import Any, List, Type, TypeVar, Union from py_tools.connections.db.mysql import BaseOrmTable from py_tools.meta_cls import SingletonMetaCls # 泛指 BaseOrmTable 所有子类实例对象类型 T_BaseOrmTable = TypeVar("T_BaseOrmTable", bound=BaseOrmTable) T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示 def with_session(method) -> T_Hints: """ 兼容事务会话 Args: method: orm 的 crud Notes: 方法中没有带事务连接则,则构造 Returns: """ @functools.wraps(method) async def wrapper(db_manager, *args, **kwargs): session = kwargs.get("session") or None if session: return await method(db_manager, *args, **kwargs) else: async with db_manager.transaction() as session: kwargs["session"] = session return await method(db_manager, *args, **kwargs) return wrapper
这里我提供了一个 with_session 装饰器,用于在需要数据库会话(事务)的数据库操作方法中自动开启事务,由于 sqlaichemy 官方推荐每个数据库操作都手动开启事务会话(自动提交),装饰器的设计没有时则构造,有则共享,这样不但可以减少冗余 async with db_manager.transaction() as session 的代码,也可以兼容多个操作共享同一个 session 有问题时进行事务回滚。
由于给方法加了通用的装饰器导致一些版本的IDE无法识别方法真实的签名,使用时会出现不知道方法的入参是什么,对于开发者来说是极其不方便的。
使用 typing 的 TypeVar 自定义类型来构造一个通用的泛型来当作函数返回的类型,进而修复。
from typing import TypeVar
T_Hints = TypeVar("T_Hints") # 用于修复被装饰的函数参数提示,让IDE有类型提示
def with_session(method) -> T_Hints:
...
这里PyCharm 2023.2.4 版本升级到 2024.1 就有提示了,IDE修复了,可以不用 T_Hints 了。
一些旧版本构造 sqlaichemy 的库表对象时也会出现不知道类对象属性入参提示,升级到最新版本都解决了。
from contextlib import asynccontextmanager class DBManager(metaclass=SingletonMetaCls): DB_CLIENT: SQLAlchemyManager = None orm_table: Type[BaseOrmTable] = None @classmethod def init_db_client(cls, db_client: SQLAlchemyManager): cls.DB_CLIENT = db_client return cls.DB_CLIENT @classmethod @asynccontextmanager async def transaction(cls): """事务上下文管理器""" async with cls.DB_CLIENT.async_session_maker.begin() as session: yield session @classmethod @asynccontextmanager async def connection(cls) -> AsyncIterator[AsyncConnection]: """数据库引擎连接上下文管理器""" async with cls.DB_CLIENT.db_engine.begin() as conn: yield conn
class DBManager(metaclass=SingletonMetaCls): DB_CLIENT: SQLAlchemyManager = None orm_table: Type[BaseOrmTable] = None @with_session async def bulk_add( self, table_objs: List[Union[T_BaseOrmTable, dict]], *, orm_table: Type[BaseOrmTable] = None, flush: bool = False, session: AsyncSession = None ) -> List[T_BaseOrmTable]: """ 批量插入 Args: table_objs: orm映射类实例列表 eg.[UserTable(username="hui", age=18), ...] or [{"username": "hui", "age": 18}, ...] orm_table: orm表映射类 flush: 刷新对象状态,默认不刷新 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 成功插入的对象列表 """ orm_table = orm_table or self.orm_table if all(isinstance(table_obj, dict) for table_obj in table_objs): # 字典列表转成orm映射类实例列表处理 table_objs = [orm_table(**table_obj) for table_obj in table_objs] session.add_all(table_objs) if flush: await session.flush(table_objs) return table_objs @with_session async def add( self, table_obj: [T_BaseOrmTable, dict], *, orm_table: Type[BaseOrmTable] = None, session: AsyncSession = None ) -> int: """ 插入一条数据 Args: table_obj: orm映射类实例对象, eg. UserTable(username="hui", age=18) or {"username": "hui", "age": 18} orm_table: orm表映射类 session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务 Returns: 新增的id table_obj.id """ orm_table = orm_table or self.orm_table if isinstance(table_obj, dict): table_obj = orm_table(**table_obj) session.add(table_obj) await session.flush(objects=[table_obj]) # 刷新对象状态,获取新增的id return table_obj.id
这里就是用 session.add 与 add_all 方法封装了数据库添加、批量添加的操作,封装的点主要在于除了 orm_table 实例对象入参还支持字典入参,内部还是转换成库表映射类实例来操作,最后通过 session.flush 方法,单个添加返回新增的主键id,批量添加则是返回实例对象列表。
设计的方法中有一个 * 号是参数的分隔符,它的作用是将其前面的参数声明为位置参数,而将 * 后面的参数声明为关键字参数,* 号后面的参数入参只能使用关键字形式的入参,我在很多的开源库中都看到了这样的设计,可以把一些函数语义连贯、常用必传的参数设置为位置参数,其他的则是关键字参数。这样可以明确参数的作用、提高函数的可读性、防止参数错误等。
具体看下使用案例:
import asyncio from sqlalchemy import String from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from py_tools.connections.db.mysql import BaseOrmTableWithTS, BaseOrmTable, DBManager, SQLAlchemyManager class UserTable(BaseOrmTableWithTS): """用户表""" __tablename__ = "user" username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称") password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码") phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号") email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱") avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像") async def create_tables(): # 根据映射创建库表(异步) # async with db_engine.begin() as conn: # await conn.run_sync(BaseOrmTable.metadata.create_all) async with DBManager.connection() as conn: await conn.run_sync(BaseOrmTable.metadata.create_all) async def init_orm_manager(): db_client = SQLAlchemyManager( host="127.0.0.1", port=3306, user="root", password="123456", db_name="hui-demo", ) db_client.init_mysql_engine() DBManager().init_db_client(db_client) async def manager_crud(): user = {"username": "hui", "email": "huidbk.163.com"} user_id = await DBManager().add(table_obj=user, orm_table=UserTable) print("user_id", user_id) users = [ {"username": "zack", "email": "zack.163.com"}, {"username": "wang", "email": "wang.163.com"} ] add_users = await DBManager().bulk_add(table_objs=users, orm_table=UserTable) add_user_ids = [user.id for user in add_users] print("add_user_ids", add_user_ids) async def main(): await create_tables() # await normal_crud() await init_orm_manager() await manager_crud() if __name__ == '__main__': # 运行主函数 asyncio.run(main())
在程序启动时初始化好DBManager 的 DB_CLIENT 就可以直接使用封装的方法,主要就是 DB_CLIENT 作为类属性,后面DBManager 实例与子类实例对象都可以共享这个数据库引擎。但我这里还是不推荐上面的写法,DBManager 是一些通用的DB操作,而具体一些业务操作还是单独封装一些DB业务Manager类来进行会比较好,更利于扩展维护与复用。
class UserManager(DBManager): orm_table = UserTable async def get_name_by_email(self, email): username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True) return username async def manager_crud(): # demo 2 (推荐) user = UserTable(username="hui-test01", email="hui-test01.163.com") user_id = await UserManager().add(table_obj=user) print("user_id", user_id) users = [ UserTable(username="hui-test02", email="hui-test02.163.com"), UserTable(username="hui-test03", email="hui-test03.163.com"), ] add_users = await UserManager().bulk_add(table_objs=users) add_user_ids = [user.id for user in add_users] print("add_user_ids", add_user_ids) username = await UserManager().get_name_by_email(email="huidbk.163.com") print("username", username) >>> out user_id 4 add_user_ids [5, 6] username hui
这里 UserManager 单独封装的 get_name_by_email 的方法就是业务中常用查询操作通过邮件获取用户名称,这里就是举一个简单的例子,具体DB业务具体封装而不是全部写在逻辑层,这样别人要用的时候就不用重新组织条件参数、上下文,而是简单传递业务参数进行复用获取数据。
UserManager 调用 add、bulk_add 等方法时也不用像 DBManager 指定 orm_table 参数,使用起来更简洁。具体是因为 UserManager 类指定了 类属性 orm_table = UserTable,再封装时有一句 orm_table = orm_table or self.orm_table 意思就是优先选择入参的orm_table,没有则是 self.orm_table (具体实例对象的orm_table)。这样写也体现出 封装、继承的灵活性。
这里也引出了另一个封装方法 query_one 查询单条数据。由于介绍了一些Demo如果把所有的封装方法混合到一起篇幅就太长,故而我准备分成三篇进行分别介绍,这样也更好阅读。
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。