赞
踩
配置 <config.py>
- HOST = '127.0.0.1'
- PORT = '3306'
- DATABASE = 'appstore'
- USERNAME = 'root'
- PASSWORD = '1234567890'
-
- DB_URI = "mysql+pymysql://{username}:{password}@{host}:{port}/{db}".format(
- username=USERNAME, password=PASSWORD, host=HOST, port=PORT, db=DATABASE)
-
- SQLALCHEMY_DATABASE_URI = DB_URI
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- SQLALCHEMY_ECHO = True
<base.py>
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- import config
-
- app = Flask(__name__)
- app.config.from_object(config) # 可以使用db.init_app(app),也可以在定义db的时候将app作为入参,例如:db = SQLAlchemy(app)
-
- # 需要注意将参数引用写在db = SQLAlchemy(app)上面db = SQLAlchemy(app)
<models.py>
- from base import db, app
-
- class Dbuser(db.Model):
- __tablename__ = 'dbuser'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- username = db.Column(db.String(50))
- email = db.Column(db.String(50))
-
- # Model 是一个元类,在继承这个类来声明子类的时候,会把表模型注册到 sqlalchemy 里,# 所以在 create_all 之前必须把模型的声明导入进来db.create_all(app=app)
说明:可以通过ForeignKey指定外键
- from base import db
- from models import Dbuser
-
- # 插入数据
- db.session.add(Dbuser(username='aaaa', email='aaaa@163.com'))
- db.session.add(Dbuser(username='bbbb', email='bbbb@163.com'))
- db.session.commit()
- from base import db
- from models import Dbuser
-
- Dbuser.query.filter_by(username='Chen').delete()
- db.session.commit()
- from base import db
- from models import Dbuser
-
- Dbuser.query.filter(Dbuser.username=='Li').update({'email':'wangbadan@qq.com'})
- db.session.commit()
- from models import Dbuser
- from sqlalchemy import or_
-
- # 普通查询
- record = Dbuser.query.filter_by(username='Chen').first()
- print(record.username)
-
- # 带and条件的查找
- record = Dbuser.query.filter_by(username='Chen', email='Chen@163.com').first()
- print(record.username)
-
- # 带or条件的查找
- record = Dbuser.query.filter(or_(Dbuser.username=='Chen',Dbuser.email=='Chen@163.com')).first()
- print(record.username)
-
- # 注意写法,同样有and_,对于filter_by,其参数即为对应表类的属性,当使用or_时候需要指定类,并且使用“==”
-
- # 特殊复杂的查找
- res_list = Dbuser.query.filter(Dbuser.email.endswith('@163.com')).all()
- for res in res_list: print(res.username)
-
- # 类似的还有notlike
- res_list = Dbuser.query.filter(Dbuser.email.like('%163.com')).all()
- for res in res_list: print(res.username)
-
- # 类似的还有notin_
- res_list = Dbuser.query.filter(Dbuser.username.in_(['Chen', 'Li'])).all()
- for res in res_list: print(res.username)
-
- # 计数
- count = Dbuser.query.filter(Dbuser.username.in_(['Chen', 'Li'])).count()
- print(count)
-
- # 排序
- res_list = Dbuser.query.filter(Dbuser.username.in_(['Chen', 'Li'])).order_by(Dbuser.username.desc())
- for res in res_list: print(res.username)
-
- # 切片
- res_list = Dbuser.query.all()[:5]
- for res in res_list: print(res.username)
注意filter和filter_by的区别,filter_by()将字段作为关键字参数传入,例如:filter_by(name=name, id=id),filter()需要指定类的属性,其需要使用“==”作为条件关联,例如:filter(cls.id==id),一般建议使用filter,因为其可以结合func,功能更强大
支持“join”和“outerjoin”,如下所示:
# 如下写法因为and前的“UserOrgRel.user_no==UserRoleRel.user_no”为真,所以and后的直接pass掉了
res_list = UserOrgRel.query.filter_by(user_no="chen", status=1).outerjoin(
UserRoleRel, UserOrgRel.user_no==UserRoleRel.user_no and UserRoleRel.status==1)
注意:当join on包含多个条件时候在sql中是直接通过and连接条件,但是在python中and的用法与其不一致,(条件 and 值1 or 值2)
- from sqlalchemy import and_
-
- res_list = UserOrgRel.query.filter_by(user_no="chen", status=1).outerjoin(UserRoleRel, and_(UserOrgRel.user_no==UserRoleRel.user_no, UserRoleRel.status==1))
语法:join(表, 条件)
查询指定列:
result = StuModel.query.with_entities(StuModel.id) # 返回BaseQuery, 别名使用 label
如果对sql比较熟悉,可以直接执行sql,如下:
- from base import db
-
- sql = "select * from dbuser where username = 'Li'"
- for res in db.session.execute(sql):
- print(res)
-
- sql = """insert into dbuser (username, email) values ('Chenduanyun', 'Chenduanyun1216@163.com')"""
- db.session.execute(sql)
- db.session.commit()
-
- # 分页操作
- obj = obj.paginate(page, page_size, error_out=False)
其中page为页码,page_size为每页显示数量,需要注意在接受requests参数时候需要将page和page_size转换成整数,此外获取结果需要使用items
- class PrcDetail():
-
- """返回流程节点相关明细数据"""
-
- @staticmethod
- def get_link_role(link_id):
- res_data = BizFlowNodeOwner.query.with_entities(
- func.concat('Link_', BizFlowNodeOwner.biz_flow_node_id).label("link_id"),
- BizRole.id.label("role_id"),
- BizRole.role_name
- ).filter(func.concat("Link_", BizFlowNodeOwner.id)==link_id).join(
- BizRole, BizFlowNodeOwner.role_id==BizRole.id
- ).group_by(
- BizFlowNodeOwner.biz_flow_node_id, BizRole.role_name, BizRole.id
- ).all()
-
- if not res_data:
- return []
-
- res_list = []
- for item in res_data:
- res = {
- "link_id": item.link_id,
- "role_id": item.role_id,
- "role_name": item.role_name
- }
-
- res_list.append(res)
- return res_list
① sql中的一些方法可以调用func,from sqlalchemy import func
② sql中as定义别名,在sqlalchemy中使用“label”
③ first() 返回一行数据,all() 返回全量数据为一个列表,其返回的是一个结构体,在对结构体进行操作前建议先判断是否为空,防止抛出异常
④ 查询特定字段使用with_entities方法
⑤ 表别名可以使用aliased,例如:
- from sqlalchemy.orm import aliased
-
- a = aliased(BizFlowNodeOwner) # 使用a代表表BizFlowNodeOwner
1、limit():可以限制每次查询的时候只查询几条数据
2、offset():可以限制查找数据的时候过滤掉前面多少条
3、切片:可以对Query对象使用切片操作,可以使用`slice(start,stop)`方法来做切片操作,也可以使用`[start:stop]`的方式来进行切片操作
4、all():返回的是一个列表
5、first():返回第一个结果,如果没有则返回None
6、first_or_404():返回第一个结果,如果没有则抛出404异常
7、get():返回主键对应记录,没有则返回None
8、get_or_404():返回主键对应记录,没有则抛出404异常
9、count():返回查询结果数量,切记勿使用len(obj.all())来计算长度,其非常耗费资源,同样也可以使用total
10、paginate(page, page_size):返回paginate对象,此对象用于分页,需要注意这种分页查询和first和all不同,其需要使用items用来返回,例如
res = cls.query.filter(id==1).paginate(page=1, page_size=10)
return [item.format() for item in res.items]
11、order_by():排序,可以使用desc进行倒序排列,例如:query.order_by(cls.id.desc())
12、group_by():分组统计
13、func模块:其相关方法基本和sql中保持一致,例如sum、count、concat、coalesce、ifnull等
补充:在sql中的row_number over()、rank() over()等可以按照如下方法使用:
- res_data = bf.query.with_entities(
-
- func.row_number().over(partition_by=bf.biz_flow_name, order_by=(bf.id.desc(), bf.biz_flow_name.asc())),bf.biz_flow_name
-
- )
14、case when的使用方法:
- from sqlalchemy import func, case
-
- bf = aliased(BizFlow)
-
- res_data = bf.query.with_entities(
-
- case(whens=[(bf.id==1, "One"), (bf.id==2, "Two")], else_="Others"),
-
- bf.biz_flow_name)
补充:如果when条件又多个可以使用 and or 等进行连接
15、子查询exists
- from sqlalchemy import exists
-
- res_data = BizFlow.query.filter(
-
- exists().where(BizFlowNode.biz_flow_id != BizFlow.id))
补充:not exists可以在前面加上not_,例如:
not_(exists().where(条件))
16、not_、or_、like、in_ 等其他基本和sql使用方式保持一致
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。