当前位置:   article > 正文

flask-sqlalchemy中的基本sql操作_flasksqlachemy子查询

flasksqlachemy子查询


更新

# 批量更新某表多条数据的某些字段
HeTongTaiZhangLaoWu.query.filter_by(xiang_mu_id=xiang_mu_id).update({"is_delete": True})

# 批量更新某表不同数据不同字段,更新数据中需要有主键
update_data = [
    {'id': 1, 'is_delete': True, 'name': '张三'},
    {'id': 2, 'is_delete': False, 'age': 20}
]
# 注意这种方式并不能触发orm层级的更新,即涉及到关联关系的属性并不能更新,只能对本表内的属性进行更新
db.session.bulk_update_mappings(RenYuanXinXi, update_data)

# 更新单条数据
ren_yuan_xin_xi.name = '张三'  # ren_yuan_xin_xi是数据库查询的实例化对象
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

新增

# 创建一条
db.session.add(RenYuanXinXi(name='张三'))

# 批量创建数据,同表数据
create_data = [
    {'name': '张三', 'age': 20},
    {'name': '李四', 'age': 22},
]
# 注意这种方式并不能触发orm层级的更新,即涉及到关联关系的属性并不能新增,只能对本表内的属性进行新增
db.session.bulk_insert_mappings(RenYuanXinXi, create_data)

# 批量创建数据,不同表数据
create_data = []
create_data.append(RenYuanXinXi(name='张三'))
create_data.append(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi(suo_shu_zu_zhi=1))
db.session.add_all(create_data)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

查询

查询的顺序

在Flask-SQLAlchemy中,查询语句的执行顺序并不是按照你在代码中写的顺序来执行的。实际上,查询语句的执行顺序是由SQLAlchemy的ORM(对象关系映射)层在后台处理的。
以下是一般的执行顺序:

  1. db.session.query():这是开始一个查询的起点,你在这里指定你想要查询的模型或者字段。
  2. filter():这个方法在查询中添加WHERE子句,用于过滤结果。
  3. group_by():这个方法在查询中添加GROUP BY子句,用于将结果集按照某个字段进行分组。
  4. 聚合函数(如db.func.max):这些函数通常在GROUP BY子句之后使用,用于对每个分组进行某种计算,如求最大值、最小值、平均值等。
  5. order_by():这个方法在查询中添加ORDER BY子句,用于对结果集进行排序。
  6. 最后,当你调用all()、first()等方法时,SQLAlchemy会将整个查询转换为SQL语句并在数据库中执行。

这个执行顺序是由SQL标准和SQLAlchemy的ORM层决定的,和你在代码中调用这些方法的顺序无关。你可以在代码中以任何顺序调用这些方法,SQLAlchemy会在后台正确地组织它们。

基本查询

# 查询符合条件的第一条, 没有返回None
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).first()

# 查询符合条件的所有, 没有返回空的查询集
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).all()

# 预加载数据
ManYiDuDiaoChaShiJianSheZhi.query.options(selectinload(ManYiDuDiaoChaShiJianSheZhi.can_shus)).filter(ManYiDuDiaoChaShiJianSheZhi.id == subquery).first()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

查询id最大的一条数据

shi_jian = ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).order_by(ManYiDuDiaoChaShiJianSheZhi.id.desc()).first()
  • 1

查询关联表id最大的所有数据

                    subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
                        XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
                    db_data = db_class.query.options(selectinload(db_class.xiang_mu_shi_jian_she_zhi)).filter(
                        db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery).all()
  • 1
  • 2
  • 3
  • 4

连接两表查询

                    subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
                        XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
                    db_data = db_class.query.join(XiangMuYunXingZhiBiaoCanShu).options(
                        selectinload(db_class.xiang_mu_shi_jian_she_zhi),
                        selectinload(db_class.xiang_mu_yun_xing_zhi_biao_can_shu)).filter(
                            db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery,
                            XiangMuYunXingZhiBiaoCanShu.zhi_biao_type == zhi_biao).all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

查询年龄在30以下、30-40、40-50、50以上的人数,人员按照身份证号码去重

        nian_ling_ren_shu = (
            db.session.query(
                db.func.count().label("count"),
                db.case(
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling < 30, "30岁以下"),
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling.between(30, 50), "30-50岁"),
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling > 50, "50岁以上"),
                    else_="其他",
                ).label("nian_ling"),
            )
            .distinct(FenBaoShangRenYuanZhuCeBiao.shen_fen_zheng_hao_ma)
            .group_by("nian_ling")
            .all()
        )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

子查询构建指定数组内的数据

        tong_ji_yue_fen_subquery = (
            db.session.query(db.func.distinct(model.tong_ji_yue_fen))
            .filter(
                model.zu_zhi_id == 5,
                model.tong_ji_yue_fen >= start_time,
                model.tong_ji_yue_fen < end_time,
            )
            .scalar_subquery()
        )
        if data_type == "last":
            filters.append(model.tong_ji_yue_fen.in_(tong_ji_yue_fen_subquery))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

按照datetime类型日期,忽略时分秒分组

    result = (
        db.session.query(
            YueDuWuZiXuQiuJiHua,
            db.func.date_format(YueDuWuZiXuQiuJiHua.tong_ji_yue_fen, "%Y-%m-%d").label("day"),
            db.func.sum(YueDuWuZiXuQiuJiHua.shu_liang).label("shu_liang_zong_ji"),
        )
        .filter(YueDuWuZiXuQiuJiHua.xiang_mu_id == xiang_mu_id, YueDuWuZiXuQiuJiHua.is_delete == false())
        .group_by(YueDuWuZiXuQiuJiHua.md5, "day")
        .all()
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

巧用多次子查询

    # 子查询,log数据中按照bill_id分组,去除每组操作时间最晚数据,得到log_id列表
    sub_query = (
        db.session.query(AppDataWorkFlowLog.bill_id, db.func.max(AppDataWorkFlowLog.id).label("max_id"))
        .filter(
            AppDataWorkFlowLog.optid == current_user.id,
            AppDataWorkFlowLog.status.in_(["PROCESSING", "FINISHED", "SENDBACK"]),
            AppDataWorkFlowLog.biao_dan_type == biao_dan_type,
        )
        .group_by(AppDataWorkFlowLog.bill_id)
        .subquery()
    )

    bill_ids = db.session.query(sub_query.c.bill_id).scalar_subquery()

    workflow_bills = (
        db.session.query(AppDataWorkFlowBill).filter(AppDataWorkFlowBill.id.in_(bill_ids), *bill_filters).all()
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

如果要根据某几个字段进行group_by,并且对某个字段求最大值,并找到最大值对应的数据

# 需要先建立子查询,然后根据子查询进行逐一字段匹配进行查询
sub_query = (
        db.session.query(
            GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie,
            db.func.max(GongChengZaoJiaTongJiDanXiangGongCheng.tong_ji_yue_fen).label("max_date"),
        )
        .filter(
            GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id.in_(xiang_mu_ids),
            GongChengZaoJiaTongJiDanXiangGongCheng.is_delete == false(),
        )
        .group_by(
            GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie,
        )
        .subquery()
    )
    current_app.logger.info(sub_query)

    zu_zhi_id_and_max_date = (
        db.session.query(
            GongChengZaoJiaTongJiDanXiangGongCheng,
        )
        .filter(
            GongChengZaoJiaTongJiDanXiangGongCheng.is_delete == false(),
            GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id == sub_query.c.zu_zhi_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id == sub_query.c.dan_xiang_gong_cheng_id,
            GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie == sub_query.c.zhuan_ye_lei_bie,
            GongChengZaoJiaTongJiDanXiangGongCheng.tong_ji_yue_fen == sub_query.c.max_date,
        )
        .all()
    )
    current_app.logger.info(zu_zhi_id_and_max_date)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

过滤某个字符串类型的字段不包含“-”的数据

        data = GongChengChengBenZhengTiHeSuanGcxm.query.filter(
            GongChengChengBenZhengTiHeSuanGcxm.is_delete == false(),
            GongChengChengBenZhengTiHeSuanGcxm.tong_ji_yue_fen >= start_time,
            GongChengChengBenZhengTiHeSuanGcxm.tong_ji_yue_fen <= end_time,
            GongChengChengBenZhengTiHeSuanGcxm.xiang_mu_id == zu_zhi_id,
            GongChengChengBenZhengTiHeSuanGcxm.cheng_ben_bian_ma.notlike("%-%"),
        ).all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

枚举

根据枚举值获取枚举常量

class ShiYongDanWeiType(enum.Enum):
    lao_wu_fen_bao = '劳务分包'
    zhuan_ye_fen_bao = '专业分包'
    zong_bao_dan_wei = '总包单位'
    qi_ta_dan_wei = '其他单位'
    
# 得到枚举常量
ShiYongDanWeiType('总包单位') # 通过value获得
ShiYongDanWeiType.zong_bao_dan_wei
ShiYongDanWeiType["zong_bao_dan_wei"] # 通过name获得

# 获取枚举key
ShiYongDanWeiType.lao_wu_fen_bao.name  # 返回 lao_wu_fen_bao 字符串

# 获取枚举value
ShiYongDanWeiType.lao_wu_fen_bao.value  # 返回 劳务分包 字符串

# 枚举类查询
class ZhaoTouBiaoWenJianType(enum.Enum):
    zhao_biao_wen_jian = '招标文件'
    tou_biao_wen_jian = '投标文件'
ZhaoTouBiaoWenJian.query.filter(ZhaoTouBiaoWenJian.wen_jian_lei_xing == 'zhao_biao_wen_jian').all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

打印

.__dict__

# db.session.query方式查询出来的数据打印
._asdict()

# 打印 SQL 语句
print(db.session.query(*querys).filter(*filters).group_by(*group_by).statement)

# 打印查询参数
print(db.session.query(*querys).filter(*filters).group_by(*group_by).params)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

创建或更新后立即获取新数据的相关属性

    # 创建
    record=[]
    dan_xiang_gong_chengs=[
        {
            "suo_shu_xiang_mu": 1,
        },
        {
            "suo_shu_xiang_mu": 2,
        }
    ]
    for obj in dan_xiang_gong_chengs:
        record.append(DanXiangGongCheng(**obj))
    db.session.add_all(record)
    db.session.commit()
    for obj in record:
        print('+++++++++++++++',obj.id)
    # 更新
    db_objs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([4,5])).all()
    for obj in db_objs:
        # ㎡
        obj.gui_mo_dan_wei='m'
    db.session.commit()
    for obj in db_objs:
        print(obj.gui_mo_dan_wei)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

当需要获取日期类型的属性时,上述方式有一个需要注意的点:

    # 创建
    record = []
    dan_xiang_gong_chengs = [
        {
            "suo_shu_xiang_mu": 1,
            "created_at": "2021-01-01",
        },
        {
            "suo_shu_xiang_mu": 2,
        },
    ]
    for obj in dan_xiang_gong_chengs:
        record.append(DanXiangGongCheng(**obj))
    db.session.add_all(record)
    db.session.flush()
    for obj in record:
        print("+++++++++++++++", type(obj.created_at))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

这种情况下,第一次会输出<class 'str'>,第二次会输出<class 'datetime.datetime'>。也就是说,手动创建时间类型的数据时,flush之后,自己创建时是什么类型,获取到的还是什么类型;数据库默认创建的时间类型是datetime类型的。

多对多关系的增加与修改

# 增加
    dan_xiang_gong_chengs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([1, 2])).all()
    data = {"min_zu": "汉族", "shi_yong_bu_wei": dan_xiang_gong_chengs}
    db_data = FenBaoShangRenYuanZhuCeBiao(**data)
    db.session.add(db_data)
    db.session.commit()

# 修改
    dan_xiang_gong_chengs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([4, 5])).all()
    data = {"min_zu": "傣族", "shi_yong_bu_wei": dan_xiang_gong_chengs}
    db_data = FenBaoShangRenYuanZhuCeBiao.query.filter_by(id=128).first()
    for key, value in data.items():
        setattr(db_data, key, value)
    db.session.commit()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

数据监听

使用sqlalchemy 中的event 接口进行表操作监听,可以使用event.listen() 或者event.listens_for() 装饰器

  1. 使用限制:只有单个插入数据,或者obj.field_name = new_value 的更新方式能够触发listen,其他的更新或插入方式无法触发
  2. 如果回调函数中是对数据库对象本身进行操作,那么操作完后不需要单独提交事务
# 定义 事件监听器函数,在 插入/更新 数据之前计算
def calculate_xiang_mu_gai_kuang(mapper, connection, target):
    if target.mate_info is None:
        target.mate_info = {"illegal_fields": []}

    if "illegal_fields" not in target.mate_info:
        target.mate_info["illegal_fields"] = []

    fields_to_check = ["ben_qi_ji_hua_wan_cheng_lv", "lei_ji_ji_hua_wan_cheng_lv"]

    for field in fields_to_check:
        if getattr(target, field) is None:
            if field not in target.mate_info["illegal_fields"]:
                target.mate_info["illegal_fields"].append(field)
        else:
            if field in target.mate_info["illegal_fields"]:
                target.mate_info["illegal_fields"].remove(field)

# 将事件监听器与数据模型关联
event.listen(XiangMuGaiKuang, "before_insert", calculate_xiang_mu_gai_kuang, propagate=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

使用listens_for 装饰器举例
listens_for 中需要指定需要监听的字段,如果有多个字段,可以使用多个装饰器

@event.listens_for(DanXiangGongCheng.ji_chu_lei_xing, "set")
def on_ren_yuan_xin_xi_changed(target, value, old_value, initiator):
    print(11111)
    target.is_tong_bu_super = false()
  • 1
  • 2
  • 3
  • 4

通过query语句获取当前查询model

query.column_descriptions[0]["type"]
# 或者
query.column_descriptions[0]["expr"]
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/521826
推荐阅读
相关标签
  

闽ICP备14008679号