赞
踩
最近在对SQLModel做一些封装,本篇文章主要记录封装过程中的一些思路和实战代码。
from typing import Optional from sqlmodel import Field, SQLModel, create_engine, Session, select, or_ class Hero(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str secret_name: str age: Optional[int] = None # 创建数据库引擎 sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4" engine = create_engine(sqlite_url, echo=True) # 创建所有表 SQLModel.metadata.drop_all(engine) SQLModel.metadata.create_all(engine) # 新增 hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador") hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48) hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32) hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35) hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36) hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93) with Session(engine) as session: session.add(hero_1) session.add(hero_2) session.add(hero_3) session.add(hero_4) session.add(hero_5) session.add(hero_6) session.add(hero_7) session.commit() # or 查询 with Session(engine) as session: statement = select(Hero).where(or_(Hero.age <= 35, Hero.age > 90)) results = session.exec(statement) for hero in results: print(hero)
这个代码用起来虽然已经比较简单了,但是我觉得还可以更简单,更灵活,所以想要进一步封装一个方法,能够轻松的支持or查询。
利用反射的一些特性,我初步做了如下方法的一个封装。
def get_by_dict_or(engine, model, query_dict: dict): """ 根据查询字典指定的键值对查询数据,键值对之间的关系是or的关系 :param engine: 引擎对象 :param model: 模型类 :param query_dict: 查询字典 {age:{gt:3,lt:33}} """ with Session(engine) as session: query = select(model) if isinstance(query_dict, dict): for k, v in query_dict.items(): if hasattr(model, k): if isinstance(v, dict): # or_(Hero.age <= 35, Hero.age > 90) mk = getattr(model, k) conditions = [] for kk, vv in v.items(): if kk == "==": conditions.append(mk == vv) elif kk == ">": conditions.append(mk > vv) elif kk == "<": conditions.append(mk < vv) elif kk == ">=": conditions.append(mk >= vv) elif kk == "<=": conditions.append(mk <= vv) query = query.where(or_(*conditions)) results = session.exec(query) return results.all()
这个方法能不能行呢?
先写个简单的测试代码测一下。
from typing import Optional from sqlmodel import Field, SQLModel import fastzdp_sqlmodel as fsqlmodel class User(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str age: Optional[int] = None # 创建数据库引擎 engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel") # 初始化表格 fsqlmodel.init_table(engine) # 创建数据 u = User(name="张三", age=23) fsqlmodel.add(engine, u) u = User(name="张三", age=33) fsqlmodel.add(engine, u) u = User(name="李四", age=24) fsqlmodel.add(engine, u) # 查询年龄等于23或者小于10的用户 query = {"age": {">": 23, "<": 10}} users = fsqlmodel.get_by_dict_or(engine, User, query) print(users)
执行结果如下:
初步来看是比较符合预期的。
这种or字典查询的方式看上去比较易懂,能不能支持and查询呢。
比如 {age:{>:20, <:30}}
不就是一个非常容易看懂的range查询吗?
所以我决定对上面的这个方法做一个改造,封装另一个方法。
将之前的方法复制过来改个名就行了。
def get_by_dict_and(engine, model, query_dict: dict): """ 根据查询字典指定的键值对查询数据,键值对之间的关系是and的关系 :param engine: 引擎对象 :param model: 模型类 :param query_dict: 查询字典 {age:{gt:3,lt:33}} """ with Session(engine) as session: query = select(model) if isinstance(query_dict, dict): for k, v in query_dict.items(): if hasattr(model, k): if isinstance(v, dict): # or_(Hero.age <= 35, Hero.age > 90) mk = getattr(model, k) conditions = [] for kk, vv in v.items(): if kk == "==": conditions.append(mk == vv) elif kk == ">": conditions.append(mk > vv) elif kk == "<": conditions.append(mk < vv) elif kk == ">=": conditions.append(mk >= vv) elif kk == "<=": conditions.append(mk <= vv) query = query.where(*conditions) results = session.exec(query) return results.all()
也是在上一个测试代码的基础上稍微改造一下。
from typing import Optional from sqlmodel import Field, SQLModel import fastzdp_sqlmodel as fsqlmodel class User(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str age: Optional[int] = None # 创建数据库引擎 engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel") # 初始化表格 fsqlmodel.init_table(engine) # 创建数据 u = User(name="张三", age=23) fsqlmodel.add(engine, u) u = User(name="张三", age=33) fsqlmodel.add(engine, u) u = User(name="李四", age=24) fsqlmodel.add(engine, u) # 查询年龄等于20或者小于30的用户 query = {"age": {">": 20, "<": 30}} users = fsqlmodel.get_by_dict_and(engine, User, query) print(users)
输出结果如下:
初步来看,也是非常符合预期的。
实际上上面的代码还有不少遗留的问题,比如我想要查询名字是张三或者年龄大于20的怎么查询呢?
不过我们有了初步的方法以后,后面就是增加功能和优化了,暂时不急。
我决定先把整个框架的大体架子给搭建起来。
from typing import Optional from sqlmodel import Field, SQLModel, create_engine, Session, select, or_ class Hero(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) name: str = Field(index=True) secret_name: str age: int | None = Field(default=None, index=True) # 创建数据库引擎 sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4" engine = create_engine(sqlite_url, echo=True) # 创建所有表 SQLModel.metadata.drop_all(engine) SQLModel.metadata.create_all(engine) # 新增 hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador") hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48) hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32) hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35) hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36) hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93) with Session(engine) as session: session.add(hero_1) session.add(hero_2) session.add(hero_3) session.add(hero_4) session.add(hero_5) session.add(hero_6) session.add(hero_7) session.commit() # 查询 with Session(engine) as session: statement = select(Hero).where(Hero.id == 1) results = session.exec(statement) hero = results.first() print("Hero:", hero)
from typing import Optional from sqlmodel import Field, SQLModel, create_engine, Session, select, or_ class Hero(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) name: str = Field(index=True) secret_name: str age: int | None = Field(default=None, index=True) # 创建数据库引擎 sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4" engine = create_engine(sqlite_url, echo=True) # 创建所有表 SQLModel.metadata.drop_all(engine) SQLModel.metadata.create_all(engine) # 新增 hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador") hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48) hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32) hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35) hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36) hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93) with Session(engine) as session: session.add(hero_1) session.add(hero_2) session.add(hero_3) session.add(hero_4) session.add(hero_5) session.add(hero_6) session.add(hero_7) session.commit() # 查询 with Session(engine) as session: hero = session.get(Hero, 1) print("Hero:", hero)
如果ID不存在,需不需要抛出异常?不需要
所以我这里决定选first这个方法。
def get(engine, model, id):
"""
根据ID查询数据
:param engine: 连接数据库的引擎对象
:param model: 模型类
:param id: 要查找的id的值
:return: id对应的模型对象,如果不存在返回None
"""
with Session(engine) as session:
return session.exec(select(model).where(model.id == id)).first()
测试代码如下:
from typing import Optional from sqlmodel import Field, SQLModel import fastzdp_sqlmodel as fsqlmodel class User(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str age: Optional[int] = None # 创建数据库引擎 engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel") # 初始化表格 fsqlmodel.init_table(engine) # 创建数据 u = User(name="张三", age=23) fsqlmodel.add(engine, u) u = User(name="张三", age=33) fsqlmodel.add(engine, u) u = User(name="李四", age=24) fsqlmodel.add(engine, u) # 查询id为1的数据 u = fsqlmodel.get(engine, User, 1) print(u) # 查询id为11的数据 u = fsqlmodel.get(engine, User, 11) print(u)
输出结果:
从输出结果来看,这个方法是比较符合预期的。
from typing import Optional from sqlmodel import Field, SQLModel, create_engine, Session, select, or_ class Hero(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) name: str = Field(index=True) secret_name: str age: int | None = Field(default=None, index=True) # 创建数据库引擎 sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4" engine = create_engine(sqlite_url, echo=True) # 创建所有表 SQLModel.metadata.drop_all(engine) SQLModel.metadata.create_all(engine) # 新增 hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador") hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48) hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32) hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35) hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36) hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93) with Session(engine) as session: session.add(hero_1) session.add(hero_2) session.add(hero_3) session.add(hero_4) session.add(hero_5) session.add(hero_6) session.add(hero_7) session.commit() # 查询 with Session(engine) as session: statement = select(Hero).where(Hero.name == "Spider-Boy") results = session.exec(statement) hero = results.one() print("Hero:", hero) hero.age = 16 session.add(hero) session.commit() session.refresh(hero) print("Updated hero:", hero)
def update(engine, model_obj, update_dict): """ 修改数据 :param engine: 连接数据库的引擎对象 :param model_obj: 模型对象 :param update_dict: 更新字典 :return: """ with Session(engine) as session: if not isinstance(update_dict, dict): return for k, v in update_dict.items(): if hasattr(model_obj, k): setattr(model_obj, k, v) session.add(model_obj) session.commit() session.refresh(model_obj) return model_obj
from typing import Optional from sqlmodel import Field, SQLModel import fastzdp_sqlmodel as fsqlmodel class User(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str age: Optional[int] = None # 创建数据库引擎 engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel") # 初始化表格 fsqlmodel.init_table(engine) # 创建数据 u = User(name="张三", age=23) fsqlmodel.add(engine, u) # 查询id为1的数据 u = fsqlmodel.get(engine, User, 1) print(u) # 修改 update_dict = {"name": "张三333"} fsqlmodel.update(engine, u, update_dict) # 查询id为1的数据 u = fsqlmodel.get(engine, User, 1) print(u)
测试结果如下:
从测试结果来看是基本符合预期的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。