当前位置:   article > 正文

关于SQLModel库的一些封装思路和实战,fastzdp_sqlmodel库封装过程中的一些笔记,20240817

关于SQLModel库的一些封装思路和实战,fastzdp_sqlmodel库封装过程中的一些笔记,20240817

概述

最近在对SQLModel做一些封装,本篇文章主要记录封装过程中的一些思路和实战代码。

实现or查询

原本的代码

from typing import Optional
from sqlmodel import Field, SQLModel, create_engine, Session, select, or_


class Hero(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    secret_name: str
    age: Optional[int] = None


# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4"
engine = create_engine(sqlite_url, echo=True)

# 创建所有表
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)

# 新增
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
    session.add(hero_1)
    session.add(hero_2)
    session.add(hero_3)
    session.add(hero_4)
    session.add(hero_5)
    session.add(hero_6)
    session.add(hero_7)
    session.commit()

# or 查询
with Session(engine) as session:
    statement = select(Hero).where(or_(Hero.age <= 35, Hero.age > 90))
    results = session.exec(statement)
    for hero in results:
        print(hero)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

思考

这个代码用起来虽然已经比较简单了,但是我觉得还可以更简单,更灵活,所以想要进一步封装一个方法,能够轻松的支持or查询。

初步设计

利用反射的一些特性,我初步做了如下方法的一个封装。

def get_by_dict_or(engine, model, query_dict: dict):
    """
    根据查询字典指定的键值对查询数据,键值对之间的关系是or的关系
    :param engine: 引擎对象
    :param model: 模型类
    :param query_dict: 查询字典 {age:{gt:3,lt:33}}
    """
    with Session(engine) as session:
        query = select(model)
        if isinstance(query_dict, dict):
            for k, v in query_dict.items():
                if hasattr(model, k):
                    if isinstance(v, dict):
                        # or_(Hero.age <= 35, Hero.age > 90)
                        mk = getattr(model, k)
                        conditions = []
                        for kk, vv in v.items():
                            if kk == "==":
                                conditions.append(mk == vv)
                            elif kk == ">":
                                conditions.append(mk > vv)
                            elif kk == "<":
                                conditions.append(mk < vv)
                            elif kk == ">=":
                                conditions.append(mk >= vv)
                            elif kk == "<=":
                                conditions.append(mk <= vv)
                        query = query.where(or_(*conditions))
        results = session.exec(query)
        return results.all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

初步测试

这个方法能不能行呢?

先写个简单的测试代码测一下。

from typing import Optional
from sqlmodel import Field, SQLModel
import fastzdp_sqlmodel as fsqlmodel


class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    age: Optional[int] = None


# 创建数据库引擎
engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel")

# 初始化表格
fsqlmodel.init_table(engine)

# 创建数据
u = User(name="张三", age=23)
fsqlmodel.add(engine, u)
u = User(name="张三", age=33)
fsqlmodel.add(engine, u)
u = User(name="李四", age=24)
fsqlmodel.add(engine, u)

# 查询年龄等于23或者小于10的用户
query = {"age": {">": 23, "<": 10}}
users = fsqlmodel.get_by_dict_or(engine, User, query)
print(users)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

执行结果如下:
在这里插入图片描述

初步来看是比较符合预期的。

突然的想法

这种or字典查询的方式看上去比较易懂,能不能支持and查询呢。

比如 {age:{>:20, <:30}} 不就是一个非常容易看懂的range查询吗?

所以我决定对上面的这个方法做一个改造,封装另一个方法。

初步方法封装

将之前的方法复制过来改个名就行了。

def get_by_dict_and(engine, model, query_dict: dict):
    """
    根据查询字典指定的键值对查询数据,键值对之间的关系是and的关系
    :param engine: 引擎对象
    :param model: 模型类
    :param query_dict: 查询字典 {age:{gt:3,lt:33}}
    """
    with Session(engine) as session:
        query = select(model)
        if isinstance(query_dict, dict):
            for k, v in query_dict.items():
                if hasattr(model, k):
                    if isinstance(v, dict):
                        # or_(Hero.age <= 35, Hero.age > 90)
                        mk = getattr(model, k)
                        conditions = []
                        for kk, vv in v.items():
                            if kk == "==":
                                conditions.append(mk == vv)
                            elif kk == ">":
                                conditions.append(mk > vv)
                            elif kk == "<":
                                conditions.append(mk < vv)
                            elif kk == ">=":
                                conditions.append(mk >= vv)
                            elif kk == "<=":
                                conditions.append(mk <= vv)
                        query = query.where(*conditions)
        results = session.exec(query)
        return results.all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

稍微测试一下

也是在上一个测试代码的基础上稍微改造一下。

from typing import Optional
from sqlmodel import Field, SQLModel
import fastzdp_sqlmodel as fsqlmodel


class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    age: Optional[int] = None


# 创建数据库引擎
engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel")

# 初始化表格
fsqlmodel.init_table(engine)

# 创建数据
u = User(name="张三", age=23)
fsqlmodel.add(engine, u)
u = User(name="张三", age=33)
fsqlmodel.add(engine, u)
u = User(name="李四", age=24)
fsqlmodel.add(engine, u)

# 查询年龄等于20或者小于30的用户
query = {"age": {">": 20, "<": 30}}
users = fsqlmodel.get_by_dict_and(engine, User, query)
print(users)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

输出结果如下:
在这里插入图片描述

初步来看,也是非常符合预期的。

遗留问题

实际上上面的代码还有不少遗留的问题,比如我想要查询名字是张三或者年龄大于20的怎么查询呢?

不过我们有了初步的方法以后,后面就是增加功能和优化了,暂时不急。

我决定先把整个框架的大体架子给搭建起来。

封装根据ID查询的方法

原本的代码1

from typing import Optional
from sqlmodel import Field, SQLModel, create_engine, Session, select, or_


class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    secret_name: str
    age: int | None = Field(default=None, index=True)


# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4"
engine = create_engine(sqlite_url, echo=True)

# 创建所有表
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)

# 新增
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
    session.add(hero_1)
    session.add(hero_2)
    session.add(hero_3)
    session.add(hero_4)
    session.add(hero_5)
    session.add(hero_6)
    session.add(hero_7)
    session.commit()

# 查询
with Session(engine) as session:
    statement = select(Hero).where(Hero.id == 1)
    results = session.exec(statement)
    hero = results.first()
    print("Hero:", hero)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

原本的代码2

from typing import Optional
from sqlmodel import Field, SQLModel, create_engine, Session, select, or_


class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    secret_name: str
    age: int | None = Field(default=None, index=True)


# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4"
engine = create_engine(sqlite_url, echo=True)

# 创建所有表
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)

# 新增
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
    session.add(hero_1)
    session.add(hero_2)
    session.add(hero_3)
    session.add(hero_4)
    session.add(hero_5)
    session.add(hero_6)
    session.add(hero_7)
    session.commit()

# 查询
with Session(engine) as session:
    hero = session.get(Hero, 1)
    print("Hero:", hero)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

简单的思考

如果ID不存在,需不需要抛出异常?不需要

所以我这里决定选first这个方法。

初步封装

def get(engine, model, id):
    """
    根据ID查询数据
    :param engine: 连接数据库的引擎对象
    :param model: 模型类
    :param id: 要查找的id的值
    :return: id对应的模型对象,如果不存在返回None
    """
    with Session(engine) as session:
        return session.exec(select(model).where(model.id == id)).first()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

简单的测试

测试代码如下:

from typing import Optional
from sqlmodel import Field, SQLModel
import fastzdp_sqlmodel as fsqlmodel


class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    age: Optional[int] = None


# 创建数据库引擎
engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel")

# 初始化表格
fsqlmodel.init_table(engine)

# 创建数据
u = User(name="张三", age=23)
fsqlmodel.add(engine, u)
u = User(name="张三", age=33)
fsqlmodel.add(engine, u)
u = User(name="李四", age=24)
fsqlmodel.add(engine, u)

# 查询id为1的数据
u = fsqlmodel.get(engine, User, 1)
print(u)

# 查询id为11的数据
u = fsqlmodel.get(engine, User, 11)
print(u)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

输出结果:
在这里插入图片描述

从输出结果来看,这个方法是比较符合预期的。

封装修改数据的方法

原本的代码

from typing import Optional
from sqlmodel import Field, SQLModel, create_engine, Session, select, or_


class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    secret_name: str
    age: int | None = Field(default=None, index=True)


# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:root@127.0.0.1:3306/fastzdp_sqlmodel?charset=utf8mb4"
engine = create_engine(sqlite_url, echo=True)

# 创建所有表
SQLModel.metadata.drop_all(engine)
SQLModel.metadata.create_all(engine)

# 新增
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
    session.add(hero_1)
    session.add(hero_2)
    session.add(hero_3)
    session.add(hero_4)
    session.add(hero_5)
    session.add(hero_6)
    session.add(hero_7)
    session.commit()

# 查询
with Session(engine) as session:
    statement = select(Hero).where(Hero.name == "Spider-Boy")
    results = session.exec(statement)
    hero = results.one()
    print("Hero:", hero)

    hero.age = 16
    session.add(hero)
    session.commit()
    session.refresh(hero)
    print("Updated hero:", hero)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

简单的封装

def update(engine, model_obj, update_dict):
    """
    修改数据
    :param engine: 连接数据库的引擎对象
    :param model_obj: 模型对象
    :param update_dict: 更新字典
    :return:
    """
    with Session(engine) as session:
        if not isinstance(update_dict, dict):
            return
        for k, v in update_dict.items():
            if hasattr(model_obj, k):
                setattr(model_obj, k, v)
        session.add(model_obj)
        session.commit()
        session.refresh(model_obj)
        return model_obj
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

测试

from typing import Optional
from sqlmodel import Field, SQLModel
import fastzdp_sqlmodel as fsqlmodel


class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    age: Optional[int] = None


# 创建数据库引擎
engine = fsqlmodel.get_engine(database="fastzdp_sqlmodel")

# 初始化表格
fsqlmodel.init_table(engine)

# 创建数据
u = User(name="张三", age=23)
fsqlmodel.add(engine, u)

# 查询id为1的数据
u = fsqlmodel.get(engine, User, 1)
print(u)

# 修改
update_dict = {"name": "张三333"}
fsqlmodel.update(engine, u, update_dict)

# 查询id为1的数据
u = fsqlmodel.get(engine, User, 1)
print(u)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

测试结果如下:
在这里插入图片描述

从测试结果来看是基本符合预期的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/999088
推荐阅读
相关标签
  

闽ICP备14008679号