赞
踩
# 批量更新某表多条数据的某些字段
HeTongTaiZhangLaoWu.query.filter_by(xiang_mu_id=xiang_mu_id).update({"is_delete": True})
# 批量更新某表不同数据不同字段,更新数据中需要有主键
update_data = [
{'id': 1, 'is_delete': True, 'name': '张三'},
{'id': 2, 'is_delete': False, 'age': 20}
]
# 注意这种方式并不能触发orm层级的更新,即涉及到关联关系的属性并不能更新,只能对本表内的属性进行更新
db.session.bulk_update_mappings(RenYuanXinXi, update_data)
# 更新单条数据
ren_yuan_xin_xi.name = '张三' # ren_yuan_xin_xi是数据库查询的实例化对象
# 创建一条 db.session.add(RenYuanXinXi(name='张三')) # 批量创建数据,同表数据 create_data = [ {'name': '张三', 'age': 20}, {'name': '李四', 'age': 22}, ] # 注意这种方式并不能触发orm层级的更新,即涉及到关联关系的属性并不能新增,只能对本表内的属性进行新增 db.session.bulk_insert_mappings(RenYuanXinXi, create_data) # 批量创建数据,不同表数据 create_data = [] create_data.append(RenYuanXinXi(name='张三')) create_data.append(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi(suo_shu_zu_zhi=1)) db.session.add_all(create_data)
在Flask-SQLAlchemy中,查询语句的执行顺序并不是按照你在代码中写的顺序来执行的。实际上,查询语句的执行顺序是由SQLAlchemy的ORM(对象关系映射)层在后台处理的。
以下是一般的执行顺序:
这个执行顺序是由SQL标准和SQLAlchemy的ORM层决定的,和你在代码中调用这些方法的顺序无关。你可以在代码中以任何顺序调用这些方法,SQLAlchemy会在后台正确地组织它们。
# 查询符合条件的第一条, 没有返回None
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).first()
# 查询符合条件的所有, 没有返回空的查询集
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).all()
# 预加载数据
ManYiDuDiaoChaShiJianSheZhi.query.options(selectinload(ManYiDuDiaoChaShiJianSheZhi.can_shus)).filter(ManYiDuDiaoChaShiJianSheZhi.id == subquery).first()
shi_jian = ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).order_by(ManYiDuDiaoChaShiJianSheZhi.id.desc()).first()
subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
db_data = db_class.query.options(selectinload(db_class.xiang_mu_shi_jian_she_zhi)).filter(
db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery).all()
subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
db_data = db_class.query.join(XiangMuYunXingZhiBiaoCanShu).options(
selectinload(db_class.xiang_mu_shi_jian_she_zhi),
selectinload(db_class.xiang_mu_yun_xing_zhi_biao_can_shu)).filter(
db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery,
XiangMuYunXingZhiBiaoCanShu.zhi_biao_type == zhi_biao).all()
nian_ling_ren_shu = (
db.session.query(
db.func.count().label("count"),
db.case(
(FenBaoShangRenYuanZhuCeBiao.nian_ling < 30, "30岁以下"),
(FenBaoShangRenYuanZhuCeBiao.nian_ling.between(30, 50), "30-50岁"),
(FenBaoShangRenYuanZhuCeBiao.nian_ling > 50, "50岁以上"),
else_="其他",
).label("nian_ling"),
)
.distinct(FenBaoShangRenYuanZhuCeBiao.shen_fen_zheng_hao_ma)
.group_by("nian_ling")
.all()
)
tong_ji_yue_fen_subquery = (
db.session.query(db.func.distinct(model.tong_ji_yue_fen))
.filter(
model.zu_zhi_id == 5,
model.tong_ji_yue_fen >= start_time,
model.tong_ji_yue_fen < end_time,
)
.scalar_subquery()
)
if data_type == "last":
filters.append(model.tong_ji_yue_fen.in_(tong_ji_yue_fen_subquery))
result = (
db.session.query(
YueDuWuZiXuQiuJiHua,
db.func.date_format(YueDuWuZiXuQiuJiHua.tong_ji_yue_fen, "%Y-%m-%d").label("day"),
db.func.sum(YueDuWuZiXuQiuJiHua.shu_liang).label("shu_liang_zong_ji"),
)
.filter(YueDuWuZiXuQiuJiHua.xiang_mu_id == xiang_mu_id, YueDuWuZiXuQiuJiHua.is_delete == false())
.group_by(YueDuWuZiXuQiuJiHua.md5, "day")
.all()
)
# 子查询,log数据中按照bill_id分组,去除每组操作时间最晚数据,得到log_id列表 sub_query = ( db.session.query(AppDataWorkFlowLog.bill_id, db.func.max(AppDataWorkFlowLog.id).label("max_id")) .filter( AppDataWorkFlowLog.optid == current_user.id, AppDataWorkFlowLog.status.in_(["PROCESSING", "FINISHED", "SENDBACK"]), AppDataWorkFlowLog.biao_dan_type == biao_dan_type, ) .group_by(AppDataWorkFlowLog.bill_id) .subquery() ) bill_ids = db.session.query(sub_query.c.bill_id).scalar_subquery() workflow_bills = ( db.session.query(AppDataWorkFlowBill).filter(AppDataWorkFlowBill.id.in_(bill_ids), *bill_filters).all() )
# 需要先建立子查询,然后根据子查询进行逐一字段匹配进行查询 sub_query = ( db.session.query( GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id, GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id, GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie, db.func.max(GongChengZaoJiaTongJiDanXiangGongCheng.tong_ji_yue_fen).label("max_date"), ) .filter( GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id.in_(xiang_mu_ids), GongChengZaoJiaTongJiDanXiangGongCheng.is_delete == false(), ) .group_by( GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id, GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id, GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie, ) .subquery() ) current_app.logger.info(sub_query) zu_zhi_id_and_max_date = ( db.session.query( GongChengZaoJiaTongJiDanXiangGongCheng, ) .filter( GongChengZaoJiaTongJiDanXiangGongCheng.is_delete == false(), GongChengZaoJiaTongJiDanXiangGongCheng.zu_zhi_id == sub_query.c.zu_zhi_id, GongChengZaoJiaTongJiDanXiangGongCheng.dan_xiang_gong_cheng_id == sub_query.c.dan_xiang_gong_cheng_id, GongChengZaoJiaTongJiDanXiangGongCheng.zhuan_ye_lei_bie == sub_query.c.zhuan_ye_lei_bie, GongChengZaoJiaTongJiDanXiangGongCheng.tong_ji_yue_fen == sub_query.c.max_date, ) .all() ) current_app.logger.info(zu_zhi_id_and_max_date)
data = GongChengChengBenZhengTiHeSuanGcxm.query.filter(
GongChengChengBenZhengTiHeSuanGcxm.is_delete == false(),
GongChengChengBenZhengTiHeSuanGcxm.tong_ji_yue_fen >= start_time,
GongChengChengBenZhengTiHeSuanGcxm.tong_ji_yue_fen <= end_time,
GongChengChengBenZhengTiHeSuanGcxm.xiang_mu_id == zu_zhi_id,
GongChengChengBenZhengTiHeSuanGcxm.cheng_ben_bian_ma.notlike("%-%"),
).all()
class ShiYongDanWeiType(enum.Enum): lao_wu_fen_bao = '劳务分包' zhuan_ye_fen_bao = '专业分包' zong_bao_dan_wei = '总包单位' qi_ta_dan_wei = '其他单位' # 得到枚举常量 ShiYongDanWeiType('总包单位') # 通过value获得 ShiYongDanWeiType.zong_bao_dan_wei ShiYongDanWeiType["zong_bao_dan_wei"] # 通过name获得 # 获取枚举key ShiYongDanWeiType.lao_wu_fen_bao.name # 返回 lao_wu_fen_bao 字符串 # 获取枚举value ShiYongDanWeiType.lao_wu_fen_bao.value # 返回 劳务分包 字符串 # 枚举类查询 class ZhaoTouBiaoWenJianType(enum.Enum): zhao_biao_wen_jian = '招标文件' tou_biao_wen_jian = '投标文件' ZhaoTouBiaoWenJian.query.filter(ZhaoTouBiaoWenJian.wen_jian_lei_xing == 'zhao_biao_wen_jian').all()
.__dict__
# db.session.query方式查询出来的数据打印
._asdict()
# 打印 SQL 语句
print(db.session.query(*querys).filter(*filters).group_by(*group_by).statement)
# 打印查询参数
print(db.session.query(*querys).filter(*filters).group_by(*group_by).params)
# 创建 record=[] dan_xiang_gong_chengs=[ { "suo_shu_xiang_mu": 1, }, { "suo_shu_xiang_mu": 2, } ] for obj in dan_xiang_gong_chengs: record.append(DanXiangGongCheng(**obj)) db.session.add_all(record) db.session.commit() for obj in record: print('+++++++++++++++',obj.id) # 更新 db_objs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([4,5])).all() for obj in db_objs: # ㎡ obj.gui_mo_dan_wei='m' db.session.commit() for obj in db_objs: print(obj.gui_mo_dan_wei)
当需要获取日期类型的属性时,上述方式有一个需要注意的点:
# 创建 record = [] dan_xiang_gong_chengs = [ { "suo_shu_xiang_mu": 1, "created_at": "2021-01-01", }, { "suo_shu_xiang_mu": 2, }, ] for obj in dan_xiang_gong_chengs: record.append(DanXiangGongCheng(**obj)) db.session.add_all(record) db.session.flush() for obj in record: print("+++++++++++++++", type(obj.created_at))
这种情况下,第一次会输出<class 'str'>
,第二次会输出<class 'datetime.datetime'>
。也就是说,手动创建时间类型的数据时,flush之后,自己创建时是什么类型,获取到的还是什么类型;数据库默认创建的时间类型是datetime类型的。
# 增加
dan_xiang_gong_chengs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([1, 2])).all()
data = {"min_zu": "汉族", "shi_yong_bu_wei": dan_xiang_gong_chengs}
db_data = FenBaoShangRenYuanZhuCeBiao(**data)
db.session.add(db_data)
db.session.commit()
# 修改
dan_xiang_gong_chengs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([4, 5])).all()
data = {"min_zu": "傣族", "shi_yong_bu_wei": dan_xiang_gong_chengs}
db_data = FenBaoShangRenYuanZhuCeBiao.query.filter_by(id=128).first()
for key, value in data.items():
setattr(db_data, key, value)
db.session.commit()
使用sqlalchemy 中的event 接口进行表操作监听,可以使用event.listen() 或者event.listens_for() 装饰器
# 定义 事件监听器函数,在 插入/更新 数据之前计算 def calculate_xiang_mu_gai_kuang(mapper, connection, target): if target.mate_info is None: target.mate_info = {"illegal_fields": []} if "illegal_fields" not in target.mate_info: target.mate_info["illegal_fields"] = [] fields_to_check = ["ben_qi_ji_hua_wan_cheng_lv", "lei_ji_ji_hua_wan_cheng_lv"] for field in fields_to_check: if getattr(target, field) is None: if field not in target.mate_info["illegal_fields"]: target.mate_info["illegal_fields"].append(field) else: if field in target.mate_info["illegal_fields"]: target.mate_info["illegal_fields"].remove(field) # 将事件监听器与数据模型关联 event.listen(XiangMuGaiKuang, "before_insert", calculate_xiang_mu_gai_kuang, propagate=True)
使用listens_for 装饰器举例
listens_for 中需要指定需要监听的字段,如果有多个字段,可以使用多个装饰器
@event.listens_for(DanXiangGongCheng.ji_chu_lei_xing, "set")
def on_ren_yuan_xin_xi_changed(target, value, old_value, initiator):
print(11111)
target.is_tong_bu_super = false()
query.column_descriptions[0]["type"]
# 或者
query.column_descriptions[0]["expr"]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。