赞
踩
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
-
- # create the extension
- db = SQLAlchemy()
-
- # create the app
- app = Flask(__name__)
-
- # configure the SQLite database, relative to the app instance folder
- app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
-
- # initialize the app with the extension
- db.init_app(app)
上面代码中 db 对象 ( 就是 SQLAlchemy 实例 )
2 种创建方式
- # 导入扩展包flask_sqlalchemy
- from flask_sqlalchemy import SQLAlchemy
-
- # 方法 1
- # 直接实例化sqlalchemy对象,传⼊app
- db = SQLAlchemy(app)
-
- # 方法 2
- # 通过⼿动调⽤初始话app的函数
- db = SQLAlchemy()
- db.init_app(app)
在单独运行调试时,对数据库操作需要在Flask的应用上下文中进行
- with app.app_context():
- User.query.all()
示例
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
- from sqlalchemy import Column, Integer, CHAR, text
-
- app = Flask(__name__)
- # 配置数据库连接地址
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1:3306/world'
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
-
- db = SQLAlchemy(app)
-
-
- class City(db.Model):
- __tablename__ = 'city'
- # id: Mapped[int] = mapped_column(db.Integer, primary_key=True)
- # username: Mapped[str] = mapped_column(db.String, unique=True, nullable=False)
-
- ID = db.Column(db.Integer, primary_key=True)
- Name = db.Column(CHAR(35), nullable=False)
- CountryCode = db.Column(CHAR(3), nullable=False)
- District = db.Column(CHAR(20), nullable=False)
- Population = db.Column(db.Integer, nullable=False)
-
-
- def main_1():
- with app.app_context():
- # 不通过 session 进行查询
- result = City.query.all()
- for item in result:
- print(f"{item.ID} {item.Name} {item.CountryCode} {item.District} {item.Population}")
-
-
- def main_2():
- with app.app_context():
- # db.create_all()
- # db.session.add(User(username="example"))
- # db.session.commit()
- # 通过 session 进行查询
- result = db.session.execute(db.select(City)).all()
- for item in result:
- temp_1 = item._asdict()
- print(f'temp_1 ---> {temp_1}')
- temp_2 = item.tuple()[0]
- print(f'temp_2 ---> {temp_2}')
-
-
- def main_3():
- with app.app_context():
- # 通过 session 使用 原始 sql 语句进行查询
- result_2 = db.session.execute(text('select * from city'))
- for item in result_2:
- data_dict = item._asdict()
- print(data_dict)
-
-
- if __name__ == '__main__':
- # main_1()
- # main_2()
- main_3()
How to convert SQLAlchemy row object to a Python dict:https://stackoverflow.com/questions/1958219/how-to-convert-sqlalchemy-row-object-to-a-python-dict
将sqlalchemy行对象转换为python dict:https://www.codenong.com/1958219/
Postgres: postgresql://user:password@localhost/mydatabase
MySQL: mysql://user:password@localhost/mydatabase
Oracle: oracle://user:password@127.0.0.1:1521/sidname
SQLite: sqlite:absolute/path/to/foo.db
在 Flask 应用中引入相关模块和配置数据库连接。
配置参数放在Flask的应用配置 app.config 中
示例 1:
- from flask import Flask
- app = Flask(__name__)
-
- # 定义配置对象
- class Config(object):
- SQLALCHEMY_DATABASE_URI = 'mysql://root:mysql@127.0.0.1:3306/db'
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- SQLALCHEMY_ECHO = True
-
- app.config.from_object(Config)
示例 2:
- from flask import Flask
- app = Flask(__name__)
-
- # 配置数据库的连接信息
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/db'
- # 关闭动态追踪修改的警告信息
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- # 展示sql语句
- app.config['SQLALCHEMY_ECHO'] = True
配置参数说明
名字 | 备注 |
---|---|
SQLALCHEMY_DATABASE_URI | 连接数据库URI |
SQLALCHEMY_BINDS | 一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库 |
SQLALCHEMY_ECHO | 如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,对调试有用 |
SQLALCHEMY_RECORD_QUERIES | 可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries() |
SQLALCHEMY_NATIVE_UNICODE | 可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL ) |
SQLALCHEMY_POOL_SIZE | 数据库连接池的大小。默认是引擎默认值(通常 是 5 ) |
SQLALCHEMY_POOL_TIMEOUT | 设定连接池的连接超时时间。默认是 10 |
SQLALCHEMY_POOL_RECYCLE | 多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时 |
ORM 模型类参数说明
字段类型
类型名 | python中类型 | 说明 |
---|---|---|
Integer | int | 普通整数,一般是32位 |
SmallInteger | int | 取值范围小的整数,一般是16位 |
BigInteger | int或long | 不限制精度的整数 |
Float | float | 浮点数 |
Numeric | decimal.Decimal | 普通整数,一般是32位 |
String | str | 变长字符串 |
Text | str | 变长字符串,对较长或不限长度的字符串做了优化 |
Unicode | unicode | 变长Unicode字符串 |
UnicodeText | unicode | 变长Unicode字符串,对较长或不限长度的字符串做了优化 |
Boolean | bool | 布尔值 |
Date | datetime.date | 时间 |
Time | datetime.datetime | 日期和时间 |
LargeBinary | str | 二进制文件 |
列选项
选项名 | 说明 |
---|---|
primary_key | 如果为True,代表表的主键 |
unique | 如果为True,代表这列不允许出现重复的值 |
index | 如果为True,为这列创建索引,提高查询效率 |
nullable | 如果为True,允许有空值,如果为False,不允许有空值 |
default | 为这列定义默认值 |
关系选项
选项名 | 说明 |
---|---|
backref | 在关系的另一模型中添加反向引用 |
primaryjoin | 明确指定两个模型之间使用的联结条件 |
uselist | 如果为False,不使用列表,而使用标量值 |
order_by | 指定关系中记录的排序方式 |
secondary | 指定多对多关系中关系表的名字 |
secondary join | 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 |
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String, unique=True, nullable=False)
- email = db.Column(db.String)
如果没有设置
__tablename__
属性,根据默认规则,表名将会是类名小写形式的"user"
。如果想要设置自定义的表名,可以在模型类中显式地定义
__tablename__
属性定义、创建 models and tables:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/models/
定义所有模型和表后,调用以创建 数据库中的表架构。这需要应用程序上下文。因为你不是 此时,在请求中,手动创建一个。
所有的 models 和 tables 被定义后,就可以在 "应用上下文(application context)" 中,调用 SQLAlchemy.create_all() 在数据库中创建 table schema。
- with app.app_context():
- db.create_all()
如果在其他模块中定义的有 models ,则必须在调用 create_all
之前导入它们,否则 SQLAlchemy 将不知道它们。
如果表已经存在数据库中,则 create_all
不会更新表。如果要更改 model 的 列,则可以使用迁移库( 如带有 Flask-Alembic 或 Flask-Migrate 的 Alembic)来生成更新数据库架构的迁移。
在 Flask 视图函数中,或者 CLI 命令中,可以使用 db.session
执行 查询、修改 模型 数据。
SQLAlchemy 自动为每个模型定义一个 __init__
方法,该方法分配任何相应数据库列和其他属性的关键字参数。
示例:
- @app.route("/users")
- def user_list():
- users = db.session.execute(db.select(User).order_by(User.username)).scalars()
- return render_template("user/list.html", users=users)
-
- @app.route("/users/create", methods=["GET", "POST"])
- def user_create():
- if request.method == "POST":
- user = User(
- username=request.form["username"],
- email=request.form["email"],
- )
- db.session.add(user)
- db.session.commit()
- return redirect(url_for("user_detail", id=user.id))
-
- return render_template("user/create.html")
-
- @app.route("/user/<int:id>")
- def user_detail(id):
- user = db.get_or_404(User, id)
- return render_template("user/detail.html", user=user)
-
- @app.route("/user/<int:id>/delete", methods=["GET", "POST"])
- def user_delete(id):
- user = db.get_or_404(User, id)
-
- if request.method == "POST":
- db.session.delete(user)
- db.session.commit()
- return redirect(url_for("user_list"))
-
- return render_template("user/delete.html", user=user)
旧的查询方法:Model.query
新的查询方法:db.session.execute(db.select(...))
查询:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/queries/
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import Column, Integer, CHAR, text
- from dictalchemy import make_class_dictable
-
- app = Flask(__name__)
- # 配置数据库连接地址
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1:3306/world'
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
-
- db = SQLAlchemy(app)
-
-
- class City(db.Model):
- __tablename__ = 'city'
-
- ID = db.Column(db.Integer, primary_key=True)
- Name = db.Column(CHAR(35), nullable=False)
- CountryCode = db.Column(CHAR(3), nullable=False)
- District = db.Column(CHAR(20), nullable=False)
- Population = db.Column(db.Integer, nullable=False)
-
-
- def main():
- make_class_dictable(City)
- with app.app_context():
- # 旧的查询方法:Model.query
- # result = City.query.all()
- # list(map(lambda x=None: print(x.asdict()), result))
- # 新的查询方法:db.session.execute(db.select(...))
- result = db.session.execute(db.select(City)).all()
- list(map(lambda x=None: print(x.tuple()[0].asdict()), result))
-
-
- if __name__ == '__main__':
- main()
SQLAlchemy.Model 声明 模型的基类。它会自动设置表名,而不需要 __tableame__
SQLAlchemy.session 作用域是 "Flask应用上下文"。每次请求后都会对其进行清理。
SQLAlchemy.metadata and SQLAlchemy.metadatas 允许访问每个配置中定义的元数据。
SQLAlchemy.engine and SQLAlchemy.engines 允许访问每个配置中定义的引擎。
SQLAlchemy.create_all() 创建所有 表
必须处于活动的 Flask 应用程序上下文中才能执行查询和访问 会话和引擎。
:https://juejin.cn/post/7239296984985288765
创建 User 类继承自 db.Model类,同时定义id、name、mobile、gender、....等属性,对应数据库中表user的列。
- class User(db.Model):
- __tablename__ = 'user'
-
- class GENDER:
- MALE = 0
- FEMALE = 1
-
- id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
- mobile = db.Column(db.String, doc='手机号')
- password = db.Column(db.String, doc='密码')
- name = db.Column('user_name', db.String, doc='昵称')
- gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
- birthday = db.Column(db.Date, doc='生日')
- is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
- # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
- time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
- update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')
-
- # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
- follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')
创建 Car类 继承自 db.Model类
- class Car(db.Model):
- __tablename__ = 'car'
-
- class TYPE:
- SUV = 0
- SEDAN = 1
- PICKUP = 2
-
- id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
- user_id = db.Column(db.Integer, doc='用户ID')
- type = db.Column(db.Integer, doc='类型')
- name = db.Column(db.String, doc='名称')
- price = db.Column(db.Numeric, default=0.00, doc='价格')
常用参数说明:
db.Model:所有模型类都应该继承自 db.Model。
__tablename__:指定模型类对应的数据库表名。如果不指定,则默认为类名的小写形式。
db.Column:用来定义模型类中的各个字段,需要指定字段类型。
primary_key=True:用来指定主键字段。
default:用来指定字段的默认值。
unique=True:用来指定字段的唯一性约束。
index=True:用来指定字段是否需要创建索引。
db.ForeignKey():用来定义外键关系。需要传入对应的表格的主键作为参数
db.relationship():用来定义模型之间的关系。第一个参数需要传入要关联的模型类名,第二个参数可以通过 backref 来指定反向引用
lazy:用来指定关系的加载方式,有两种常见的方式:
lazy=True:表示使用惰性加载,即在首次访问相关属性时才会加载数据。
lazy=False:表示立即加载,即在查询时同时加载相关数据。
可以手动创建数据库表,也可以通过迁移的方式,创建数据库表。
CREATE TABLE `user` (
`user_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户ID',
`mobile` char(11) NOT NULL COMMENT '手机号',
`password` varchar(93) NULL COMMENT '密码',
`user_name` varchar(32) NULL COMMENT '昵称',
`gender` tinyint(1) NOT NULL DEFAULT '0' COMMENT '性别',
`birthday` date NULL COMMENT '生日',
`is_delete` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`user_id`),
UNIQUE KEY `mobile` (`mobile`),
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';CREATE TABLE `car` (
`car_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`user_id` bigint(20) unsigned NOT NULL COMMENT '用户ID',
`type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
`name` varchar(20) NOT NULL COMMENT '名称',
`price` decimal(10,2) DEFAULT '0.00' COMMENT '价格',
PRIMARY KEY (`car_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='车辆表';
- from datetime import datetime
-
- from flask import Flask
- # 导入扩展包flask_sqlalchemy
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
- # 配置数据库的连接信息
- app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost/demo'
- # 关闭动态追踪修改的警告信息
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- # 展示sql语句
- app.config['SQLALCHEMY_ECHO'] = True
-
- # 实例化sqlalchemy对象,并且和程序实例关联
- db = SQLAlchemy(app)
-
-
- class User(db.Model):
- __tablename__ = 'user'
-
- class GENDER:
- MALE = 0
- FEMALE = 1
-
- id = db.Column('user_id', db.Integer, primary_key=True, doc='用户ID')
- mobile = db.Column(db.String, doc='手机号')
- password = db.Column(db.String, doc='密码')
- name = db.Column('user_name', db.String, doc='昵称')
- gender = db.Column(db.Integer, default=GENDER.FEMALE, doc='性别')
- birthday = db.Column(db.Date, doc='生日')
- is_delete = db.Column(db.Boolean, default=False, doc='是否删除')
- # 当模型类字段与表字段不一致,可在Column函数第一个参数指定
- time = db.Column('create_time', db.DateTime, default=datetime.now, doc='创建时间')
- update_time = db.Column('update_time', db.DateTime, default=datetime.now, onupdate=datetime.now, doc='更新时间')
-
- # primaryjoin定义连接条件 : param1:另外一方类名 param2: 具体连接条件
- follows = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')
-
-
- class Car(db.Model):
- __tablename__ = 'car'
-
- class TYPE:
- SUV = 0
- SEDAN = 1
- PICKUP = 2
-
- id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
- user_id = db.Column(db.Integer, doc='用户ID')
- type = db.Column(db.Integer, doc='类型')
- name = db.Column(db.String, doc='名称')
- price = db.Column(db.Numeric, default=0.00, doc='价格')
-
-
- if __name__ == '__main__':
- app.run()
- # 插入一条记录
- user = User(name='张三', mobile='12345678910')
- db.session.add(user)
- db.session.commit()
-
- # 查询记录
- users = User.query.all()
- print(users)
-
- # 更新记录
- user = User.query.filter_by(name='张三').first()
- user.mobile = '12345678910'
- db.session.commit()
-
- # 删除记录
- user = User.query.filter_by(name='张三').first()
- db.session.delete(user)
- db.session.commit()
首先在MySQL中创建数据库,接着定义模型类,通过迁移的方式,创建数据库表。
实现数据库迁移,需要用到扩展包:
flask-script:提供程序运行、迁移的脚本命令
flask-migrate:提供数据库迁移的功能
创建启动文件manage.py实现数据库迁移
- app = Flask(__name__)
-
- # 从flask_script中导入脚本管理器
- from flask_script import Manager
- # 从flask_migrate导入迁移工具、迁移命令
- from flask_migrate import Migrate, MigrateCommand
-
- # 实例化脚本管理器对象
- manager = Manager(app)
-
- # 创建SQLAlchemy对象
- from flask_sqlalchemy import SQLAlchemy
- db = SQLAlchemy()
-
- # 让迁移工具和程序实例app、sqlalchemy实例关联
- Migrate(app, db)
-
- # 添加迁移命令
- manager.add_command('db', MigrateCommand)
-
- if __name__ == '__main__':
- manager.run()
在终端中通过命令执行迁移
初始化迁移仓库:python manage.py db init
生成迁移脚本文件:
python manage.py db migrate
python manage.py db migrate -m init_tables
执行迁移脚本文件:python manage.py db upgrade
通过模型类的数据库会话对象db.session
进行ORM类的CRUD操作,其封装了对数据库的基本操作,如:提交数据、回滚、添加、删除等
增加
- @app.route('/add')
- def add():
- # 添加数据
- user = User(mobile='12345678910', name='flask')
- # 把创建的模型类对象添加给数据库会话对象
- db.session.add(user)
- # 提交数据到数据库中
- db.session.commit()
- # 添加多个数据
- user1 = User(mobile='12345678911', name='flask1')
- user2 = User(mobile='12345678912', name='flask2')
- db.session.add_all([user1, user2])
- # 提交数据到数据库中
- db.session.commit()
- return 'add ok'
-
-
- if __name__ == '__main__':
- app.run()
修改。更新和删除都必须要commit提交数据
- user = User.query.get(1)
- user.name = 'flask'
- db.session.add(user)
- db.session.commit()
-
- User.query.filter_by(id=1).update({'name':'flask'})
- db.session.commit()
删除
- user = User.query.order_by(User.id.desc()).first()
- db.session.delete(user)
- db.session.commit()
-
- User.query.filter(User.mobile='12345678910').delete()
- db.session.commit()
all():查询所有,返回列表
select * from 表名;
User.query.all()first():查询第一个,返回对象
select * from 表名 limit 1;
User.query.first()get():根据主键ID获取对象,若主键不存在返回None
select * from 表名 where id=1;
User.query.get(1)另一种查询方式
db.session.query(User).all()
db.session.query(User).first()
db.session.query(User).get(1)filter_by 过滤查询
条件可以为空,默认查询所有,参数为模型类的字段名。
只能使用赋值运算符,必须使用查询执行器;
select mobile from 表名 where mobile='12345678910'
User.query.filter_by(mobile='12345678910').all()
User.query.filter_by(mobile='12345678910').first()
# 查询条件是and关系
User.query.filter_by(mobile='12345678910', id=1).first()filter:过虑查询。
条件可以为空,默认查询所有,参数为模型类名加上字段名,
可以使用丰富的运算符,保修使用查询执行器;
User.query.filter(User.mobile=='12345678910').first()# 查询所有字段
user = User.query.filter_by(id=1).first()# 查询指定字段
from sqlalchemy.orm import load_only
User.query.options(load_only(User.name, User.mobile)).filter_by(id=1).first()
- from flask_sqlalchemy import SQLAlchemy
-
- db = SQLAlchemy()
-
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(80), unique=True, nullable=False)
-
-
- user_ids = [1, 2, 5] # 举例的 ID 列表
-
- # SELECT * FROM user WHERE id IN (1, 2, 5);
- users = User.query.filter(User.id.in_(user_ids)).all()
- from flask_sqlalchemy import SQLAlchemy
-
- db = SQLAlchemy()
-
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(80), unique=True, nullable=False)
-
-
- # SELECT * FROM user WHERE username LIKE '%example%';
- search_1 = "%example%" # '%' 是 SQL 的通配符,匹配任意数量的字符
- users_1 = User.query.filter(User.username.like(search_1)).all()
-
- # 如果你想执行不区分大小写的搜索,可以使用 ilike
- # SELECT * FROM user WHERE username ILIKE '%example%';
- search_2 = "%example%" # 依然使用 '%' 作为通配符
- users_2 = User.query.filter(User.username.ilike(search_2)).all()
多条件复合查询:手机号以123开始,按用户id倒序排序,起始位置2开始,返回3条符合的数据
User.query.filter(User.name.startswith('123')).order_by(User.id.desc()).offset(2).limit(3).all()query = User.query.filter(User.name.startswith('123'))
query = query.order_by(User.id.desc())
query = query.offset(2).limit(3)
ret = query.all()
ORM 默认是全表扫描,使用load_only函数可以指定字段
查询所有用户的拥有的SUV类型的车辆数
from sqlalchemy import funcdb.session.query(Car.user_id, func.count(Car.name)).filter(Car.relation == Car.TYPE.SUV).group_by(Car.user_id).all()
# 一方
class User(db.Model):
# relationship:指定关联对象Car,表示一个用户可以拥有多辆车
cars = db.relationship('Car')# 多方
class Car(db.Model):
# ForeignKey: 指定car属于那个用户
user_id = db.Column(db.Integer, db.ForeignKey('user.user_id'), doc='用户ID')
# 在flask-sqlalchemy中返回模型类对象的数据
def __repr__(self):
car = {
'car_id': self.id,
'name': self.name,
'type': self.type,
'price': self.price,
}
return str(car)@app.route('/test')
def test():
# select * from user where user_id=1
user = User.query.get(1)
print(user)
# select * from car where user_id=1
print(user.cars)
for car in user.cars:
print(car.name)
return 'ok'uselist:返回数据是否已列表形式返回。
- Talse:user.cars得到的是一个对象,
- 否则是一个InstrumentedList类型,需要遍历
class User(db.Model):
cars = db.relationship('Car', uselist=False)@app.route('/test')
def test():
user = User.query.get(1)
print(user)
print(user.cars)
print(user.cars.name)
return 'ok'
在查询时使用反向引用来获取关联对象的属性值
class User(db.Model):
cars = db.relationship('Car', uselist=False, backref='myuser')
@app.route('/test')
def test():
car = Car.query.get(1)
print(car.myuser)
return 'ok'
# 一方
class User(db.Model):
cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')# 多方
class Car(db.Model):
id = db.Column('car_id', db.Integer, primary_key=True, doc='主键ID')
user_id = db.Column(db.Integer, doc='用户ID')@app.route('/test')
def test():
user = User.query.get(1)
print(user)
print(user.cars)
for car in user.cars:
print(car.name)
return 'ok'
class User(db.Model):
cars = db.relationship('Car', primaryjoin='User.id==foreign(Car.user_id)')from sqlalchemy.orm import contains_eager,load_only
@app.route('/test')
def test():
# 使用了 join() 和 contains_eager() 方法来实现关联查询# User.query.join(User.cars):在查询 User 表时,关联查询 Cars 表
# options:为查询添加选项
# load_only:指定只加载部分字段,以提高查询效率
# contains_eager:加载 User 表与 Cars 表的关联数据。Cars是User模型中定义的cars属性,它是一个 relationship 属性,表示一个用户可以拥有多个车辆
# oad_only(Car.name):指定只加载 Cars 表中的 user_id 字段,而不加载其他字段
# filter:对查询结果进行过滤
# all:执行查询,并返回查询结果
# sql: SELECT car.car_id AS car_car_id, car.name AS car_name, user.user_id AS user_user_id, user.user_name AS user_user_name FROM user INNER JOIN car ON user.user_id = car.user_id WHERE user.user_name = %s
all = User.query.join(User.cars).options(load_only(User.name),contains_eager(User.cars).load_only(Car.name)).filter(User.name == 'flask').all()
print(all)
for item in all:
print(item)
return 'ok'
flask-sqlalchemy 中自带事务支持,默认开启事务
可以手动触发回滚:db.session.rollback()
@app.route('/test')
def test():
try:
User.query.filter_by(id=1).update({'name': 'rollback'})
1/0
db.session.commit()
except:
db.session.rollback()
return 'ok'
逻辑运算符:"与或非" 都需要导入才能使用。多条件默认是and关系,非就是不等于
比较运算符:>、<、>=、<=、!=、==逻辑或
from sqlalchemy import or_
User.query.filter(or_(User.mobile=='12345678910', User.name.endswith('sk'))).all()逻辑与
from sqlalchemy import and_
User.query.filter(and_(User.name != '12345678910', User.mobile.startswith('sk'))).all()逻辑非
from sqlalchemy import not_
User.query.filter(not_(User.mobile == '12345678910')).all()
offset:偏移,表示起始位置
User.query.offset(2).all()limit:限制返回条数
User.query.limit(2).all()
order_by:asc表示升序,desc表示降序
# 正序
User.query.order_by(User.id).all()# 倒序
User.query.order_by(User.id.desc()).all()
事件监听:https://docs.sqlalchemy.org/en/20/core/event.html
在 flask 中可以使用 Flask 提供的装饰器注册请求回调函数,他们会在 "特定的请求(事件)" 处理环节被执行。类似的,SQLAlchemy 也提供了一个 listen_for() 装饰器,他可以用来注册 事件 回调函数。
listen_for()装饰器主要接收两个参数,target表示监听的对象,这个对象可以是模型类、类实例或类属性等。identifier参数表示被监听事件的标识符,比如,用于监听属性的事件标识符有set、append、remove、init_scalar、init_collection等。
创建一个Draft模型类表示草稿,其中包含body字段和edit_tine字段,分别存储草稿正文和被修改的次数,其中edit_time字段的默认值为0,如下所示:
class Draft(db.Model):
id = db.Column(db.Integer, primary_key = True)
body = db.Column(db.Text)
edit_time = db.Column(db.Integer, default = 0)
通过注册事件监听,我们可以实现在 body 列修改时,自动叠加表示被修改次数的edit_time字段。在SQLAlchemy中,每个事件都会有一个对应的事件方法,不同的事件方法支持不同的参数。被注册的监听函数需要接收对应事件方法的所有参数,所以具体的监听函数用法因使用的事件而异。设置某个字段值将处罚set事件。
app.py: set 事件监听函数
@db.event.listens_for(Draft.body, 'set')
def increment_edit_time(target, value, oldvalue, initiator):
if target.edit_time is not None:
target.edit_time += 1
我们在listen_for()装饰器中分别传入Draft.body和set作为target和identifier参数的值,监听函数接收所有set()事件方法接收的参数,其中的target参数表示触发时间的模型类实例,使用target.edit_time即可获取我们需要叠加的字段。其他的参数也需要照常写出,虽然这里没有用到。value表示被设置的值,oldvalue表示被取代的旧值。
当set事件发生在目标对象Draft.body上时,这个监听函数就会被执行,从而自动叠加Draft.edit_time列的值,如下所示:
>>> draft = Draft(body = 'init')
>>> db.session.add(draft)
>>> db.session.commit()
>>> draft.edit_time
0
>>> draft.body
u'init'
>>> draft.body = 'edited'
>>> draft.edit_time
1
>>> draft.body = 'edited again'
>>> draft.edit_time
2
>>> draft.body = 'edited again again'
>>> draft.edit_time
3
>>> db.session.commit()
除了这种传统的参数接收方式,即接收所有事件方法接收的参数,还有一种更简单的方法。通过在listen_for()装饰器中将关键字参数name设为True,可以在监听函数中接收**kwargs作为参数(可变长关键字参数), 即“named argument”(命名参数)。然后在函数中可以使用参数名作为键来从**kwargs字典获取对应的参数值:
@db.event.listens_for(Draft.body, 'set', named = True)
def increment_edit_time(**kwargs):
if kwargs['target'].edit_time is not None:
kwargs['target'].edit_time += 1>>> draft = Draft.query.first()
>>> draft
<Draft 1>
>>> draft.body
u'edited again again'
>>> draft.edit_time
3
>>> draft.body = 'edited 3 times'
>>> draft.edit_time
4
SQLAlchemy 作为SQL工具基本身包含两大主要组件:SQLAlchemy ORM 和 SQLAlchemy Core。前者实现了ORM功能,后者实现了数据库接口等核心功能,这两类组件都提供了大量的监听事件,几乎覆盖了SQLAlchemy使用的声明周期。
除了使用listen_for装饰器,我们还可以直接使用它内部调用的 listen() 函数注册事件监听函数,第三个参数传入被注册的函数对象,比如 db.event.listen(SomeClass, ‘load’, my_load_listener)。
Cascade意为 "级联操作",就是在操作一个对象的同时,对相关的对象也执行某些操作。我们通过一个Post模型和Comment模型来演示级联操作,分别表示文章(帖子)和评论,两者是一对多关系:
class Post(db.Model):
id = db.Column(db.Integer, primary_key = True)
title = db.Column(db.String(50), unique = True)
body = db.Column(db.Text)
comments = db.relationship('Comment', back_populates = 'post')
class Comment(db.Model):
id = db.Column(db.Integer, primary_key = True)
body = db.Column(db.Text)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'))
post = db.relationship('Post', back_populates = 'comments')
级联行为通过关系函数relationship()的cascade参数设置。我们希望在操作Post对象时,处于附属地位的Comment对象也被相应执行某些操作,这时应该在Post类的关系函数中定义级联参数。设置了cascade参数的一侧将被视为父对象,相关的对象则被视为子对象。
cascade通常使用多个组合值,级联值之间使用逗号分隔,比如:
class Post(db.Model):
…
comments = db.relationship('Comment', cascade = 'save-update, merge,delete',back_populates = 'post')
常用的配置组合如下所示:
当没有设置cascade参数时,会使用默认值save-upgrate、merge。上面的all等同于除了delete-orphan以外所有可用值的组合,即save-update、merge、refresh-expire、expunge、delete。
下面介绍常用的几个级联值:
1、save-update
save-update是默认的级联行为,当cascade参数设为save-update时,如果使用db.session.add()方法将Post对象添加到数据库会话时,那么与Post相关联的Comment对象也将被添加到数据库会话。我们首先创建一个Post对象和两个Comment对象:
>>> post = Post()
>>> comment1 = Comment()
>>> comment2 = Comment()
将post1添加到数据库会话后,只有post1在数据库会话中:
>>> db.session.add(post)
>>> post in db.session
True
>>> comment1 in db.session
False
>>> comment2 in db.session
False
如果我们让post1与这两个Comment对象建立关系,那么这两个Comment对象也会自动被添加到数据库会话中:
>>> post.comments.append(comment1)
>>> post.comments.append(comment2)
>>> comment1 in db.session
True
>>> comment2 in db.session
True
当调用db.session.commit()提交数据库会话时,这三个对象都会被提交到数据库中。
2、delete
如果某个Post对象被删除,那么按照默认的行为,该Post对象相关联的所有Comment对象都将与这个Post对象取消关联,外键字段的值会被清空。如果Post类的关系函数中cascade参数设为delete时,这些相关的Comment会在关联的Post对象删除时一并删除,当需要设置delete级联时,我们会将级联值设为all或save-update、merge、delete,比如:
class Post(db.Model):
id = db.Column(db.Integer, primary_key = True)
title = db.Column(db.String(50), unique = True)
body = db.Column(db.Text)
comments = db.relationship('Comment',cascade = 'all', back_populates = 'post')
我们先创建一个文章对象post2和两个评论对象comment3和comment4,并将这两个评论对象与文章对象建立关系,将它们添加到数据库会话并提交:
>>> comment3 = Comment(body = 'very good')
>>> comment4 = Comment(body = 'excellent')
>>> post2 = Post(title = 'i have a good plan', body = 'tomorrow i will go to climbing')
>>> post2.comments.append(comment3)
>>> post2.comments.append(comment4)
>>> db.session.add(post2)
>>> db.session.commit()
现在共有两条Post记录和四条Comment记录:
>>> Post.query.all()
[<Post 1>, <Post 2>]
>>> Commment.query.all()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'Commment' is not defined
>>> Comment.query.all()
[<Comment 1>, <Comment 2>, <Comment 3>, <Comment 4>]
如果删除文对象Post2,那么对应的两个评论对象也会一并被删除:
>>> post = Post.query.get(2)
>>> post
<Post 2>
>>> db.session.delete(post)
>>> db.session.commit()
>>> Post.query.all()
[<Post 1>]
>>> Comment.query.all()
[<Comment 1>, <Comment 2>]
3、delete-orphan
这个模式是基于delete级联的,必须和delete级联一起使用,通常会设为all、delete-orphan,因为all包含delete。因此当cascade参数设为delete-orphan时,它首先包含delete级联的行为:当某个Post对象被删除时,所有相关的Comment对象都将被删除(delete级联)。除此之外,当某个Post对象(父对象)与某个Comment对象(子对象)解除关系时,也会删除该Comment对象,这个解除关系的对象被称为鼓励对象(orphan object),现在comments属性中的级联值为all、delete-orphan,如下所示:
class Post(db.Model):
id = db.Column(db.Integer, primary_key = True)
title = db.Column(db.String(50), unique = True)
body = db.Column(db.Text)
comments = db.relationship('Comment',cascade = 'all, delete-orphan', back_populates = 'post')
我们先创建一个文章对象post3和两个评论对象comment5和comment6,并将这两个评论对象与文章对象建立关系,将他们添加到数据库会话并提交:
>>> post3 = Post()
>>> post3 = Post(title = 'today learn python', body = 'python include class and object')
>>> comment5 = Comment(body = 'i also wanna learn python')
>>> comment6 = Comment(body = 'python is easy to learn, but you have to pay your time every day')
>>> post3.comments.append(comment5)
>>> post3.comments.append(comment6)
>>> db.session.add(post3)
>>> db.session.commit())
File "<stdin>", line 1
db.session.commit())
^
SyntaxError: invalid syntax
>>> db.session.commit()
现在数据库中有两条文章记录和四条评论记录:
>>> Post.query.all()
[<Post 1>, <Post 2>]
>>> Comment.query.all()
[<Comment 1>, <Comment 2>, <Comment 3>, <Comment 4>]
下面我们将comment5和comment6与post3解除关系并提交数据库会话:
>>> post3.comments.remove(comment5)
>>> post3.comments.remove(comment6)
>>> db.session.commit()
默认情况下,相关评论对象的外键会被设为空值。因为我们设置了delete-orphan级联,所以现在你会发现解除关系的两条评论记录都被删除了:
>>> Comment.query.all()
[<Comment 1>, <Comment 2>]
delete和delete-orphan通常会在一对多关系模式中,而且“多”这一侧的对象附属于“一”这一侧的对象时使用。尤其是如果“一”这一侧的“父”对象不存在了,那么“多”这一侧的“子”对象不再有意义的情况。比如,文章和评论的关系就是一个典型的示例。当文章被删除了,那么评论也就没必要在留存。在这种情况下,如果不使用级联操作,那么我们就需要手动迭代关系另一侧的所有评论对象,然后一一进行删除操作。
对于这两个级联选项,如果你不会通过列表语义对集合关系属性调用remove()方法等方式来操作关系,那么使用delete级联即可。
虽然级联操作方便,但是容易带来安全隐患,因此要谨慎使用。默认值能够满足大部分情况,所以最好仅在需要的时候才修改它。
在SQLAlchemy中,级联的行为和配置选项等最初衍生自另一个ORM—Hibernate ORM。如果对这部分内容感到困惑,那么引用SQLAlchemy文档中关于Hibernate文档的结论:“The sections we have just covered can be a bit confusing.However, in practice, it all works out nicely. (我们刚刚介绍的这部分内容可能会有一些让人困惑,不过在实际使用中,他们都会工作的很顺利)”
:https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/
Flask-migrate:https://flask-migrate.readthedocs.io/en/latest/index.html
安装:pip install Flask-Migrate
使用Flask-Migrate迁移数据库
在开发时,以删除表再重建的方式更新数据库简单直接,但明显的缺陷是会丢掉数据库中的所有数据。在生产环境下,没有人想把数据都删除掉,这时需要使用数据库迁移工具来完成这个工作。SQLAlchemy的开发者Michael Bayer写了一个数据库迁移工作—Alembic来帮助我们实现数据库的迁移,数据库迁移工具可以在不破坏数据的情况下更新数据库表的结构。蒸馏器(Alembic)是炼金术士最重要的工具,要学习SQL炼金术(SQLAlchemy),当然要掌握蒸馏器的使用。
Flask-Migrate 插件(扩展 ) 继承了Alembic,是用于处理 SQLAlchemy 数据库迁移的扩展工具。当 Model 出现变更的时候,通过migrate 去管理数据库变更。Flask-Migrate 提供了一些flask命令来简化迁移工作,可以使用它来迁移数据库。
Migrate主要有3个动作,init、migrate 和upgrade。
示例代码
- from flask import Flask, render_template, flash, url_for, redirect
- from flask_sqlalchemy import SQLAlchemy
- from flask_migrate import Migrate
-
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
-
- db = SQLAlchemy(app)
- migrate = Migrate(app, db) # 在db对象创建后调用
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(128))
命名为 app.py 才可正常执行。
开始迁移数据之前,需要先使用命令初始化创建一个迁移环境,命令:flask db init
迁移环境只需要创建一次。会在你的项目根目录下创建一个migrations文件夹其中包含了自动生成的配置文件和迁移版本文件夹。
Flask-Migrate 提供了一个命令集,使用 db 作为命令集名称,它提供的命令都以 flask db 开头。可以在命令行中输入 flask --help 查看所有可用的命令和说明。
D:\flask\FLASK_PRACTICE\DataBase>flask db --help
Usage: flask db [OPTIONS] COMMAND [ARGS]...Perform database migrations.
Options:
--help Show this message and exit.Commands:
branches Show current branch points
current Display the current revision for each database.
downgrade Revert to a previous version
edit Edit a revision file
heads Show current available heads in the script directory
history List changeset scripts in chronological order.
init Creates a new migration repository.
merge Merge two revisions together, creating a new revision file
migrate Autogenerate a new revision file (Alias for 'revision...
revision Create a new revision file.
show Show the revision denoted by the given symbol.
stamp 'stamp' the revision table with the given revision; don't run...
upgrade Upgrade to a later version
命令:flask db migrate
此命令会在 migrations下生成一个version文件夹,下面包含了对应版本的数据库操作py脚本。
由于 migrate 并不一定全部发现你对 model 的所有改动,因此生成的 p y脚本需要 review,有错的话则需要 edit。
例如目前知道的,表名称表更,列名称变更,或给 constraints 命名等,migreate 都不能发现的。更多限制细节见此:Alembic autogenerate documentation
生成迁移脚本--flask db migrate -m "add note timestamp"
这条命令可以简单理解为在flask里对数据库(db)进行迁移(migrate)。-m选项用来添加迁移备注信息。从上面的输出信息我们可以看到,Alembic检测出了模型变化:表note新加了一个timestamp列,并且相应生成了一个迁移脚本cdd9d12762fc_add_note_timestamp.py:
- """add note timestamp
- Revision ID: 7f3dae8cae4d
- Revises:
- Create Date: 2019-04-01 21:56:32.469000
- """
- from alembic import op
- import sqlalchemy as sa
-
-
- # revision identifiers, used by Alembic.
- revision = '7f3dae8cae4d'
- down_revision = None
- branch_labels = None
- depends_on = None
-
-
- def upgrade():
- # ### commands auto generated by Alembic - please adjust! ###
- op.drop_table('draft')
- op.drop_table('post')
- op.drop_table('comment')
- op.add_column('note', sa.Column('timeStamp', sa.String(length=70), nullable=True))
- op.create_unique_constraint(None, 'note', ['timeStamp'])
- # ### end Alembic commands ###
-
-
- def downgrade():
- # ### commands auto generated by Alembic - please adjust! ###
- op.drop_constraint(None, 'note', type_='unique')
- op.drop_column('note', 'timeStamp')
- op.create_table('comment',
- sa.Column('id', sa.INTEGER(), nullable=False),
- sa.Column('body', sa.TEXT(), nullable=True),
- sa.Column('post_id', sa.INTEGER(), nullable=True),
- sa.ForeignKeyConstraint(['post_id'], [u'post.id'], ),
- sa.PrimaryKeyConstraint('id')
- )
- op.create_table('post',
- sa.Column('id', sa.INTEGER(), nullable=False),
- sa.Column('title', sa.VARCHAR(length=50), nullable=True),
- sa.Column('body', sa.TEXT(), nullable=True),
- sa.PrimaryKeyConstraint('id')
- )
- op.create_table('draft',
- sa.Column('id', sa.INTEGER(), nullable=False),
- sa.Column('body', sa.TEXT(), nullable=True),
- sa.Column('edit_time', sa.INTEGER(), nullable=True),
- sa.PrimaryKeyConstraint('id')
- )
- # ### end Alembic commands ###
从上面的代码可以看出,迁移脚本主要包含了两个函数:upgrate()函数用来将改动应用到数据库,函数中包含了向表中添加timestamp字段的命令,而downgrade()函数用来撤消改动,包含了删除timestamp字段的命令。
就像这两个函数中的注释所说的,迁移命令是有Alembic自动生成的,其中可能包含错误,所以有必要在生成后检查一下。
因为每一次迁移都会生成新的迁移脚本,而且Alemic为每一次迁移都生成了修订版本(revision)ID,所以数据库可以恢复到修改历史中的任一点。正因如此,迁移环境中的文件也要纳入版本控制。
有些复杂的错误无法实现自动迁移,这时可以使用revision命令手动创建迁移脚本。这同样会生成一个迁移脚本,不过脚本中的upgrade()和downgrade()函数都是空的。你需要使用Alembic提供的Operations对象指令在这两个函数中实现具体操作,具体可以访问Alembic官方文档查看。
生成了迁移脚本后,使用upgrade子命令即可更新数据库。
命令:flask db upgrade
如果还没有创建数据库和表,这个命令会自动创建,如果已经创建,则会在不损坏数据的前提下执行更新。此命令相当于执行了 version 文件夹下的相应 py 版本,对数据库进行变更操作。
此后,对 model 有变更,只要重复 migrate 和 upgrade 操作即可。
查看帮助文档:flask db --help
使用Flask-Script的命令调用,自行参考官方文档:https://flask-migrate.readthedocs.io/en/latest/
如果你想回滚迁移,那么可以使用downgrade命令(降级),它会撤销最后一次迁移在数据库中的改动,这在开发时非常有用。比如,当执行upgrade命令后发现某些地方出错了,这时就可以执行flask db downgrade命令进行回滚,删除对应的迁移脚本,重新生成迁移脚本后再进行更新(upgrade)。
D:\flask\FLASK_PRACTICE\DataBase>flask db downgrade 5e87b4da6187
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
ERROR [root] Error: Destination 5e87b4da6187 is not a valid downgrade target from current head(s)
Flask干货:访问数据库:Flask-Script工具的使用:https://zhuanlan.zhihu.com/p/269820011
什么是 flask_script ?
通过命令行的方式来操作flask, 一般用来启动定时任务,数据库的迁移与更新等。
安装 flask_script :pip install flask_script
实现一个最小应用
- # app.py
- from flask import Flask
- from flask_script import Manager
- app = Flask(__name__)
- manager = Manager(app=app)
-
- @manager.command
- def init():
- print('初始化数据库')
- @manager.command
- def migrate():
- print('数据迁移')
- @manager.command
- def upgrade():
- print('数据更新')
-
- if __name__ == '__main__':
- manager.run()
flask_script
- # 命令行中启动
- > python manager.py init
- > python manager.py migrate
- > python manager.py upgrade
结构升级进行功能拆分。
ext.py
- # ext.py
- from flask_script import Manager
- DBMANAGER = Manager()
- @DBMANAGER.command
- def init():
- print('数据库初始化')
- @DBMANAGER.command
- def migrate():
- print('数据迁移')
- @DBMANAGER.command
- def upgrade():
- print('数据更新')
app.py
- # app.py
- from flask import Flask
- from flask_script import Manager
- from ext import DBMANAGER
- app = Flask(__name__)
- manager = Manager(app=app)
- manager.add_command('db',DBMANAGER)
- if __name__ == '__main__':
- manager.run()
- # 命令行中启动
- > python manager.py init
- > python manager.py migrate
- > python manager.py upgrad
pypi 搜索:https://pypi.org/search/?q=flask-upload
flask-upload:https://pythonhosted.org/Flask-Uploads/
flask-upload 插件,使 flask 能够灵活高效地处理文件上传。 可以创建不同的上传集。例如:用于图片的上传集合,用户音频的上传集合,用于视频的上传集合等
也可以直接使用 flask 处理文件上传:https://www.w3cschool.cn/flask/flask_file_uploading.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。