当前位置:   article > 正文

Flask-SQLAlchemy学习总结

Flask-SQLAlchemy学习总结

1)连接数据库

配置 <config.py>

  1. HOST = '127.0.0.1'
  2. PORT = '3306'
  3. DATABASE = 'appstore'
  4. USERNAME = 'root'
  5. PASSWORD = '1234567890'
  6. DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}".format(
  7.  username=USERNAME, password=PASSWORD, host=HOST, port=PORT, db=DATABASE)
  8. SQLALCHEMY_DATABASE_URI = DB_URI
  9. SQLALCHEMY_TRACK_MODIFICATIONS = False
  10. SQLALCHEMY_ECHO = True

<base.py>

  1. from flask import Flask
  2. from flask_sqlalchemy import SQLAlchemy
  3. import config
  4. app = Flask(__name__)
  5. app.config.from_object(config) # 可以使用db.init_app(app),也可以在定义db的时候将app作为入参,例如:db = SQLAlchemy(app)
  6. # 需要注意将参数引用写在db = SQLAlchemy(app)上面db = SQLAlchemy(app)

2)创建“表类”

<models.py>

  1. from base import db, app
  2. class Dbuser(db.Model):
  3.     __tablename__ = 'dbuser'
  4.     id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  5.     username = db.Column(db.String(50))
  6.     email = db.Column(db.String(50))
  7. # Model 是一个元类,在继承这个类来声明子类的时候,会把表模型注册到 sqlalchemy 里,# 所以在 create_all 之前必须把模型的声明导入进来db.create_all(app=app)

说明:可以通过ForeignKey指定外键

3)插入数据

  1. from base import db
  2. from models import Dbuser
  3. # 插入数据
  4. db.session.add(Dbuser(username='aaaa', email='aaaa@163.com'))
  5. db.session.add(Dbuser(username='bbbb', email='bbbb@163.com'))
  6. db.session.commit()

4)删除数据

  1. from base import db
  2. from models import Dbuser
  3. Dbuser.query.filter_by(username='Chen').delete()
  4. db.session.commit()

5)修改数据

  1. from base import db
  2. from models import Dbuser
  3. Dbuser.query.filter(Dbuser.username=='Li').update({'email':'wangbadan@qq.com'})
  4. db.session.commit()

6)查询数据

  1. from models import Dbuser
  2. from sqlalchemy import or_
  3. # 普通查询
  4. record = Dbuser.query.filter_by(username='Chen').first()
  5. print(record.username)
  6. # 带and条件的查找
  7. record = Dbuser.query.filter_by(username='Chen', email='Chen@163.com').first()
  8. print(record.username)
  9. # 带or条件的查找
  10. record = Dbuser.query.filter(or_(Dbuser.username=='Chen',Dbuser.email=='Chen@163.com')).first()
  11. print(record.username)
  12. # 注意写法,同样有and_,对于filter_by,其参数即为对应表类的属性,当使用or_时候需要指定类,并且使用“==
  13. # 特殊复杂的查找
  14. res_list = Dbuser.query.filter(Dbuser.email.endswith('@163.com')).all()
  15. for res in res_list: print(res.username)
  16. # 类似的还有notlike
  17. res_list = Dbuser.query.filter(Dbuser.email.like('%163.com')).all()
  18. for res in res_list: print(res.username)
  19. # 类似的还有notin_
  20. res_list = Dbuser.query.filter(Dbuser.username.in_(['Chen''Li'])).all()
  21. for res in res_list: print(res.username)
  22. # 计数
  23. count = Dbuser.query.filter(Dbuser.username.in_(['Chen''Li'])).count()
  24. print(count)
  25. # 排序
  26. res_list = Dbuser.query.filter(Dbuser.username.in_(['Chen''Li'])).order_by(Dbuser.username.desc())
  27. for res in res_list: print(res.username)
  28. # 切片
  29. res_list = Dbuser.query.all()[:5]
  30. for res in res_list: print(res.username)

注意filter和filter_by的区别,filter_by()将字段作为关键字参数传入,例如:filter_by(name=name, id=id),filter()需要指定类的属性,其需要使用“==”作为条件关联,例如:filter(cls.id==id),一般建议使用filter,因为其可以结合func,功能更强大

7)关联jion

支持“join”和“outerjoin”,如下所示:

# 如下写法因为and前的“UserOrgRel.user_no==UserRoleRel.user_no”为真,所以and后的直接pass掉了

res_list = UserOrgRel.query.filter_by(user_no="chen", status=1).outerjoin(

    UserRoleRel, UserOrgRel.user_no==UserRoleRel.user_no and UserRoleRel.status==1)

注意:当join on包含多个条件时候在sql中是直接通过and连接条件,但是在python中and的用法与其不一致,(条件 and 值1 or 值2)

  1. from sqlalchemy import and_
  2. res_list = UserOrgRel.query.filter_by(user_no="chen"status=1).outerjoin(UserRoleRel, and_(UserOrgRel.user_no==UserRoleRel.user_no, UserRoleRel.status==1))

语法:join(表, 条件)

查询指定列:

result = StuModel.query.with_entities(StuModel.id) # 返回BaseQuery, 别名使用 label

8)执行sql

如果对sql比较熟悉,可以直接执行sql,如下:

  1. from base import db
  2. sql = "select * from dbuser where username = 'Li'"
  3. for res in db.session.execute(sql):
  4. print(res)
  5. sql = """insert into dbuser (username, email) values ('Chenduanyun', 'Chenduanyun1216@163.com')"""
  6. db.session.execute(sql)
  7. db.session.commit()
  8. # 分页操作
  9. obj = obj.paginate(page, page_size, error_out=False)

其中page为页码,page_size为每页显示数量,需要注意在接受requests参数时候需要将page和page_size转换成整数,此外获取结果需要使用items

9)相关总结

  1. class PrcDetail():
  2.  """返回流程节点相关明细数据"""
  3.  @staticmethod
  4.  def get_link_role(link_id):
  5.   res_data = BizFlowNodeOwner.query.with_entities(
  6.    func.concat('Link_', BizFlowNodeOwner.biz_flow_node_id).label("link_id"),
  7.    BizRole.id.label("role_id"),
  8.    BizRole.role_name
  9.   ).filter(func.concat("Link_", BizFlowNodeOwner.id)==link_id).join(
  10.    BizRole, BizFlowNodeOwner.role_id==BizRole.id
  11.   ).group_by(
  12.    BizFlowNodeOwner.biz_flow_node_id, BizRole.role_name, BizRole.id
  13.   ).all()
  14.   if not res_data:
  15.    return []
  16.   res_list = []
  17.   for item in res_data:
  18.    res = {
  19.     "link_id": item.link_id,
  20.     "role_id": item.role_id,
  21.     "role_name": item.role_name
  22.    }
  23.   res_list.append(res)
  24.   return res_list

① sql中的一些方法可以调用func,from sqlalchemy import func

② sql中as定义别名,在sqlalchemy中使用“label”

③ first() 返回一行数据,all() 返回全量数据为一个列表,其返回的是一个结构体,在对结构体进行操作前建议先判断是否为空,防止抛出异常

④ 查询特定字段使用with_entities方法

⑤ 表别名可以使用aliased,例如:

  1. from sqlalchemy.orm import aliased
  2. a = aliased(BizFlowNodeOwner) # 使用a代表表BizFlowNodeOwner

10)api

1、limit():可以限制每次查询的时候只查询几条数据

2、offset():可以限制查找数据的时候过滤掉前面多少条

3、切片:可以对Query对象使用切片操作,可以使用`slice(start,stop)`方法来做切片操作,也可以使用`[start:stop]`的方式来进行切片操作

4、all():返回的是一个列表

5、first():返回第一个结果,如果没有则返回None

6、first_or_404():返回第一个结果,如果没有则抛出404异常

7、get():返回主键对应记录,没有则返回None

8、get_or_404():返回主键对应记录,没有则抛出404异常

9、count():返回查询结果数量,切记勿使用len(obj.all())来计算长度,其非常耗费资源,同样也可以使用total

10、paginate(page, page_size):返回paginate对象,此对象用于分页,需要注意这种分页查询和first和all不同,其需要使用items用来返回,例如

res = cls.query.filter(id==1).paginate(page=1, page_size=10)

return [item.format() for item in res.items]

11、order_by():排序,可以使用desc进行倒序排列,例如:query.order_by(cls.id.desc())

12、group_by():分组统计

13、func模块:其相关方法基本和sql中保持一致,例如sum、count、concat、coalesce、ifnull等

补充:在sql中的row_number over()、rank() over()等可以按照如下方法使用:

  1. res_data = bf.query.with_entities(
  2.  func.row_number().over(partition_by=bf.biz_flow_name, order_by=(bf.id.desc(), bf.biz_flow_name.asc())),bf.biz_flow_name
  3. )

14、case when的使用方法:

  1. from sqlalchemy import func, case
  2. bf = aliased(BizFlow)
  3. res_data = bf.query.with_entities(
  4.     case(whens=[(bf.id==1"One"), (bf.id==2"Two")], else_="Others"),
  5.     bf.biz_flow_name)

补充:如果when条件又多个可以使用 and or 等进行连接

15、子查询exists

  1. from sqlalchemy import exists
  2. res_data = BizFlow.query.filter(
  3.     exists().where(BizFlowNode.biz_flow_id != BizFlow.id))

补充:not exists可以在前面加上not_,例如:

not_(exists().where(条件))

16、not_、or_、like、in_ 等其他基本和sql使用方式保持一致

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/510814
推荐阅读
相关标签
  

闽ICP备14008679号