赞
踩
pip install flask-sqlalchemy
配置选项 | 说明 |
---|---|
SQLALCHEMY_DATABASE_URI | 连接数据库(mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8) |
SQLALCHEMY_ECHO | 调试选项 |
SQLALCHEMY_POOL_SIZE | 数据库池的大小默认为5 |
SQLALCHEMY_TRACK_MODIFICATIONS | 追踪对象的修改并发送信号 |
操作数据库首先要创建一个db对象,通常卸载exts.py文件中。
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
数据库配置我们一般卸载config.py文件里。
# 数据库配置
HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'web_v1'
USERNAME = 'root'
PASSWORD = 'root'
DB_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'.\
format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
SQLALCHEMY_DATABASE_URI = DB_URI
SQLALCHEMY_TRACK_MODIFICATIONS = True
SQLALCHEMY_ECHO = True
写完数据库配置之后需要和app绑定,如下所示:
from flask import Flask
import configs
from exts import db
app = Flask(__name__)
# 加载配置文件
app.config.from_object(configs)
# db绑定app
db.init_app(app)
数据类型 | 说明 |
---|---|
Integer | 整型 |
String | 字符串 |
Text | 文本 |
DateTime | 日期 |
Float | 浮点型 |
Boolean | 布尔值 |
PickleType | 存储一个序列化后的Python对象 |
LargeBinary | 巨长度的二进制数据 |
首先,需要创建一个模型对象,我们一般存放在model.py中。
表关系
from exts import db # 用户表 class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer, primary_key=True, autoincrement=True) username = db.Column(db.String(50), nullable=False, unique=True) email = db.Column(db.String(50), nullable=False, unique=True) # 关系表(多对多) article_tag_table = db.Table('article_tag', db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True), db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True)) # 文章表 class Article(db.Model): __tablename__ = 'article' id = db.Column(db.Integer, primary_key=True, autoincrement=True) title = db.Column(db.String(100)) content = db.Column(db.Text) author_id = db.Column(db.Integer, db.ForeignKey('user.id')) author = db.relationship("User", backref="articles") tags = db.relationship("Tag", secondary=article_tag_table, backref='tags') # 标签表 class Tag(db.Model): __tablename__ = 'tag' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(50))
创建好表之后要将表映射到数据库中。
from flask_migrate import Migrate
migrate = Migrate(app, db)
在命令行运行如下指令:
:::tips
flask db inite
flask db migrate
flask db upgrade
:::
映射成功后可以在Navicat Premium中查看各个表。
from flask import Flask import configs from exts import db from model import User from flask_migrate import Migrate app = Flask(__name__) # 加载配置文件 app.config.from_object(configs) # db绑定app db.init_app(app) migrate = Migrate(app, db) # 注意上下文 with app.app_context(): with db.engine.connect() as conn: rs = conn.execute("select 1") print(rs.fetchone()) @app.route('/') def hello_world(): # put application's code here return 'Hello World!' if __name__ == '__main__': app.run()
如果成功连接会看到返回(1,)
增加一行数据的步骤:
@app.route('/insert/')
def insert(): # put application's code here
user = User(username='mark', email='123456@qq.com')
db.session.add(user)
db.session.commit()
return 'success'
执行完代码之后就可以在Navicat Premium查看数据。
首先,在类User中加入打印的格式。
# 用户表
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50), nullable=False, unique=True)
email = db.Column(db.String(50), nullable=False, unique=True)
def __repr__(self): # 自定义 交互模式 & print() 的对象打印
return "(%s, %s, %s)" % (self.id, self.username, self.email)
加入一些测试用的数据。
@app.route('/init/') def init_data(): # put application's code here user1 = User(username='wang', email='wang@163.com') user2 = User(username='luyao', email='luyao@189.com') user3 = User(username='qiqi', email='qiqi@126.com') user4 = User(username='jiji', email='jiji@163.com') user5 = User(username='kaikai', email='kaikai@itheima.com') user6 = User(username='dahai', email='dahai@gmail.com') user7 = User(username='guishi', email='guishi@gmail.com') user8 = User(username='shenmo', email='shenmo@itheima.com') user9 = User(username='guijianchou', email='guijianchou@163.com') user10 = User(username='goujianchou', email='goujianchou@163.com') # 一次添加多条数据 db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10]) db.session.commit() return 'success'
查询所有用户数据:
@app.route('/Search/')
def search(): # put application's code here
print(User.query.all())
return f'find it!'
:::info
[(1, mark, 123456@qq.com), (3, wang, wang@163.com), (4, luyao, luyao@189.com), (5, qiqi, qiqi@126.com), (6, jiji, jiji@163.com), (7, kaikai, kaikai@itheima.com), (8, dahai, dahai@gmail.com), (9, guishi, guishi@gmail.com), (10, shenmo, shenmo@itheima.com), (11, guijianchou, guijianchou@163.com), (12, goujianchou, goujianchou@163.com)]
:::
查询有多少个用户:
@app.route('/Search/')
def search(): # put application's code here
print(User.query.count())
return f'find it!'
:::info
11
:::
查询第一个用户:
@app.route('/Search/')
def search(): # put application's code here
print(User.query.first())
return f'find it!'
:::info
(1, mark, 123456@qq.com)
:::
查询id为4的用户:
# 方式1: 根据id查询 返回模型对象/None
User.query.get(4)
# 方式2: 等值过滤器 关键字实参设置字段值 返回BaseQuery对象
# BaseQuery对象可以续接其他过滤器/执行器 如 all/count/first等
User.query.filter_by(id=4).all()
# 方式3: 复杂过滤器 参数为比较运算/函数引用等 返回BaseQuery对象
User.query.filter(User.id == 4).first()
@app.route('/Search/')
def search(): # put application's code here
print(User.query.get(4))
# print(User.query.filter_by(id=4).all())
# print(User.query.filter(User.id == 4).first()User.query.get(4))
return f'find it!'
:::info
(4, luyao, luyao@189.com)
:::
一些特殊的查找条件:
# 查询名字结尾字符为g的所有用户[开始 / 包含] User.query.filter(User.name.endswith("g")).all() User.query.filter(User.name.startswith("w")).all() User.query.filter(User.name.contains("n")).all() User.query.filter(User.name.like("w%n%g")).all() # 模糊查询 # 查询名字和邮箱都以li开头的所有用户[2种方式] User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all() from sqlalchemy import and_ User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all() # 查询age是25 或者 `email`以`itheima.com`结尾的所有用户 from sqlalchemy import or_ User.query.filter(or_(User.age==25, User.email.endswith("itheima.com"))).all() # 查询名字不等于wang的所有用户[2种方式] from sqlalchemy import not_ User.query.filter(not_(User.name == 'wang')).all() User.query.filter(User.name != 'wang').all() # 查询id为[1, 3, 5, 7, 9]的用户 User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all() # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 User.query.order_by(User.age, User.id.desc()).limit(5).all() # 查询年龄从小到大第2-5位的数据 2 3 4 5 User.query.order_by(User.age).offset(1).limit(4).all() # 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数) pn = User.query.paginate(2, 3) pn.pages 总页数 pn.page 当前页码 pn.items 当前页的数据 pn.total 总条数 # 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 from sqlalchemy import func data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all() for item in data: # print(item[0], item[1]) print(item.age, item.count) # 建议通过label()方法给字段起别名, 以属性方式获取数据 # 只查询所有人的姓名和邮箱 优化查询 User.query.all() # 相当于select * from sqlalchemy.orm import load_only data = User.query.options(load_only(User.name, User.email)).all() # flask-sqlalchem的语法 for item in data: print(item.name, item.email) data = db.session.query(User.name, User.email).all() # sqlalchemy本体的语法 for item in data: print(item.name, item.email)
主要有两种方案:
@app.route('/del/')
def delete():
# 方式1: 先查后删除
user = User.query.filter(User.username == 'luyao').first()
# 删除数据
db.session.delete(user)
# 提交会话 增删改都要提交会话
db.session.commit()
return "success"
@app.route('/del/')
def delete():
User.query.filter(User.username == 'jiji').delete()
# 提交会话 增删改都要提交会话
db.session.commit()
return "success"
@app.route('/init/') def init_data(): # put application's code here user1 = User(username='wang', email='wang@163.com') user2 = User(username='luyao', email='luyao@189.com') user3 = User(username='qiqi', email='qiqi@126.com') user4 = User(username='jiji', email='jiji@163.com') user5 = User(username='kaikai', email='kaikai@itheima.com') user6 = User(username='dahai', email='dahai@gmail.com') user7 = User(username='guishi', email='guishi@gmail.com') user8 = User(username='shenmo', email='shenmo@itheima.com') user9 = User(username='guijianchou', email='guijianchou@163.com') user10 = User(username='goujianchou', email='goujianchou@163.com') # 一次添加多条数据 db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10]) # 主动执行flush操作, 立即执行SQL操作(数据库同步) db.session.flush() db.session.commit() return 'success'
首先,可以通过如下所示的方法,生成一对多的表示例:
@app.route('/init/')
def init_data(): # put application's code here
user = User(username='huihui', email='huihui@163.com')
# 一次添加多条数据
db.session.add(user)
db.session.flush() # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败
adr1 = Address(detail='中关村3号', user_id=user.id)
adr2 = Address(detail='华强北5号', user_id=user.id)
db.session.add_all([adr1, adr2])
db.session.commit()
return 'success'
关联查询
@app.route('/demo/')
def demo():
"""查询多表数据 需求: 查询姓名为"张三"的所有地址信息"""
# 1.先根据姓名查找到主表主键
user1 = User.query.filter_by(username='huihui').first()
# 2.再根据主键到从表查询关联地址
adrs = Address.query.filter_by(user_id=user1.id).all()
for adr in adrs:
print(adr.detail)
return "demo"
关系属性
关系属性是 sqlalchemy 封装的一套查询关联数据的语法, 其目的为 让开发者使用 面向对象的形式 方便快捷的获取关联数据。
from flask import Flask from flask_migrate import Migrate from model import User, Address import configs from exts import db app = Flask(__name__) # 加载配置文件 app.config.from_object(configs) # db绑定app db.init_app(app) migrate = Migrate(app, db) @app.route('/') def hello_world(): # put application's code here return 'Hello World!' # @app.route('/init/') # def init_data(): # put application's code here # user1 = User(username='wang', email='wang@163.com') # user2 = User(username='luyao', email='luyao@189.com') # user3 = User(username='qiqi', email='qiqi@126.com') # user4 = User(username='jiji', email='jiji@163.com') # user5 = User(username='kaikai', email='kaikai@itheima.com') # user6 = User(username='dahai', email='dahai@gmail.com') # user7 = User(username='guishi', email='guishi@gmail.com') # user8 = User(username='shenmo', email='shenmo@itheima.com') # user9 = User(username='guijianchou', email='guijianchou@163.com') # user10 = User(username='goujianchou', email='goujianchou@163.com') # # # 一次添加多条数据 # db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10]) # db.session.commit() # return 'success' @app.route('/init/') def init_data(): # put application's code here user = User(username='huihui', email='huihui@163.com') # 一次添加多条数据 db.session.add(user) db.session.flush() # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败 adr1 = Address(detail='中关村3号', user_id=user.id) adr2 = Address(detail='华强北5号', user_id=user.id) db.session.add_all([adr1, adr2]) db.session.commit() return 'success' @app.route('/demo/') def demo(): """查询多表数据 需求: 查询姓名为"张三"的所有地址信息""" # 先根据姓名查找用户主键 user1 = User.query.filter_by(username='huihui').first() # 3.使用关系属性获取关系数据 for address in user1.addresses: print(address.detail) return "demo" @app.route('/Search/') def search(): # put application's code here print(User.query.get(4)) return f'find it!' @app.route('/del/') def delete(): User.query.filter(User.username == 'jiji').delete() # 提交会话 增删改都要提交会话 db.session.commit() return "success" if __name__ == '__main__': app.run()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。