赞
踩
pip install flask-sqlalchemy
pip install mysqlclient
pip install pymysql
https://flask-sqlalchemy.palletsprojects.com/en/2.x/
协议名://用户名:密码@数据库IP:端口号/数据库名
, 如:app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 设置数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
# 是否追踪数据库修改(开启后会触发一些钩子函数) 一般不开启, 会影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 是否显示底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = True
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 应用配置
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
# 方式1: 初始化组件对象, 直接关联Flask应用
db = SQLAlchemy(app)
* 方式2: 先创建组件, 延后关联Flass应用
from flask import Flask from flask_sqlalchemy import SQLAlchemy # 方式2: 初始化组件对象, 延后关联Flask应用 db = SQLAlchemy() def create_app(config_type): """工厂函数""" # 创建应用 flask_app = Flask(__name__) # 加载配置 config_class = config_dict[config_type] flask_app.config.from_object(config_class) # 关联flask应用 db.init_app(app) return flask_app
flask框架 官方建议 所有的扩展包 都支持这两种初始化方案, 其他flask扩展 也可以尝试使用
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 类->表 类属性->字段 实例对象->记录 class User(db.Model): __tablename__ = 't_user' # 设置表名, 表名默认为类名小写 id = db.Column(db.Integer, primary_key=True) # 设置主键, 默认自增 name = db.Column('username', db.String(20), unique=True) # 设置字段名 和 唯一约束 age = db.Column(db.Integer, default=10, index=True) # 设置默认值约束 和 索引 if __name__ == '__main__': # 删除所有继承自db.Model的表 db.drop_all() # 创建所有继承自db.Model的表 db.create_all() app.run(debug=True)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column('username', db.String(20), unique=True) age = db.Column(db.Integer, index=True) @app.route('/') def index(): """增加数据""" # 1.创建模型对象 user1 = User(name='zs', age=20) # user1.name = 'zs' # user1.age = 20 # 2.将模型对象添加到会话中 db.session.add(user1) # 添加多条记录 # db.session.add_all([user1, user2, user3]) # 3.提交会话 (会提交事务) # sqlalchemy会自动创建隐式事务 # 事务失败会自动回滚 db.session.commit() return "index" if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
# hm_03_数据查询.py from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@127.0.0.1:3306/test31" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_ECHO"] = False db = SQLAlchemy(app) # 自定义类 继承db.Model 对应 表 class User(db.Model): __tablename__ = "users" # 表名 默认使用类名的小写 # 定义类属性 记录字段 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64)) email = db.Column(db.String(64)) age = db.Column(db.Integer) def __repr__(self): # 自定义 交互模式 & print() 的对象打印 return "(%s, %s, %s, %s)" % (self.id, self.name, self.email, self.age) @app.route('/') def index(): """ 查询所有用户数据 查询有多少个用户 查询第1个用户 查询id为4的用户[3种方式] 查询名字结尾字符为g的所有用户[开始 / 包含] 查询名字和邮箱都以li开头的所有用户[2种方式] 查询age是25 或者 `email`以`itheima.com`结尾的所有用户 查询名字不等于wang的所有用户[2种方式] 查询id为[1, 3, 5, 7, 9]的用户 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 查询年龄从小到大第2-5位的数据 分页查询, 每页3个, 查询第2页的数据 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 只查询所有人的姓名和邮箱 优化查询 默认使用select * """ return 'index' if __name__ == '__main__': # 删除所有表 db.drop_all() # 创建所有表 db.create_all() # 添加测试数据 user1 = User(name='wang', email='wang@163.com', age=20) user2 = User(name='zhang', email='zhang@189.com', age=33) user3 = User(name='chen', email='chen@126.com', age=23) user4 = User(name='zhou', email='zhou@163.com', age=29) user5 = User(name='tang', email='tang@itheima.com', age=25) user6 = User(name='wu', email='wu@gmail.com', age=25) user7 = User(name='qian', email='qian@gmail.com', age=23) user8 = User(name='liu', email='liu@itheima.com', age=30) user9 = User(name='li', email='li@163.com', age=28) user10 = User(name='sun', email='sun@163.com', age=26) # 一次添加多条数据 db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10]) db.session.commit() app.run(debug=True)
交互模式测试
$ workon flask_env # 进入虚拟环境 $ ipython # 进入交互模式 In [1]: from hm_03_数据查询 import * # 导入项目环境 In [2]: User.query.all() # 测试查询语句: 查询所有用户 Out[2]: [(1, wang, wang@163.com, 20), (2, zhang, zhang@189.com, 33), (3, chen, chen@126.com, 23), (4, zhou, zhou@163.com, 29), (5, tang, tang@itheima.com, 25), (6, wu, wu@gmail.com, 25), (7, qian, qian@gmail.com, 23), (8, liu, liu@itheima.com, 30), (9, li, li@163.com, 28), (10, sun, sun@163.com, 26)]
# 查询所有用户数据 User.query.all() 返回列表, 元素为模型对象 # 查询有多少个用户 User.query.count() # 查询第1个用户 User.query.first() 返回模型对象/None # 查询id为4的用户[3种方式] # 方式1: 根据id查询 返回模型对象/None User.query.get(4) # 方式2: 等值过滤器 关键字实参设置字段值 返回BaseQuery对象 # BaseQuery对象可以续接其他过滤器/执行器 如 all/count/first等 User.query.filter_by(id=4).all() # 方式3: 复杂过滤器 参数为比较运算/函数引用等 返回BaseQuery对象 User.query.filter(User.id == 4).first() # 查询名字结尾字符为g的所有用户[开始 / 包含] User.query.filter(User.name.endswith("g")).all() User.query.filter(User.name.startswith("w")).all() User.query.filter(User.name.contains("n")).all() User.query.filter(User.name.like("w%n%g")).all() # 模糊查询 # 查询名字和邮箱都以li开头的所有用户[2种方式] User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all() from sqlalchemy import and_ User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all() # 查询age是25 或者 `email`以`itheima.com`结尾的所有用户 from sqlalchemy import or_ User.query.filter(or_(User.age==25, User.email.endswith("itheima.com"))).all() # 查询名字不等于wang的所有用户[2种方式] from sqlalchemy import not_ User.query.filter(not_(User.name == 'wang')).all() User.query.filter(User.name != 'wang').all() # 查询id为[1, 3, 5, 7, 9]的用户 User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all() # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个 User.query.order_by(User.age, User.id.desc()).limit(5).all() # 查询年龄从小到大第2-5位的数据 2 3 4 5 User.query.order_by(User.age).offset(1).limit(4).all() # 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数) pn = User.query.paginate(2, 3) pn.pages 总页数 pn.page 当前页码 pn.items 当前页的数据 pn.total 总条数 # 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合 from sqlalchemy import func data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all() for item in data: # print(item[0], item[1]) print(item.age, item.count) # 建议通过label()方法给字段起别名, 以属性方式获取数据 # 只查询所有人的姓名和邮箱 优化查询 User.query.all() # 相当于select * from sqlalchemy.orm import load_only data = User.query.options(load_only(User.name, User.email)).all() # flask-sqlalchem的语法 for item in data: print(item.name, item.email) data = db.session.query(User.name, User.email).all() # sqlalchemy本体的语法 for item in data: print(item.name, item.email)
查询语法
1. 先查询,再更新
2. 基于过滤条件的更新
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 商品表 class Goods(db.Model): __tablename__ = 't_good' # 设置表名 id = db.Column(db.Integer, primary_key=True) # 设置主键 name = db.Column(db.String(20), unique=True) # 商品名称 count = db.Column(db.Integer) # 剩余数量 @app.route('/') def purchase(): """购买商品""" # 更新方式1: 先查询后更新 # 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update) # 1.执行查询语句, 获取目标模型对象 goods = Goods.query.filter(Goods.name == '方便面').first() # 2.对模型对象的属性进行赋值 (更新数据) goods.count = goods.count - 1 # 3.提交会话 db.session.commit() return "index" if __name__ == '__main__': # 删除所有继承自db.Model的表 db.drop_all() # 创建所有继承自db.Model的表 db.create_all() # 添加一条测试数据 goods = Goods(name='方便面', count=1) db.session.add(goods) db.session.commit() app.run(debug=True)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 商品表 class Goods(db.Model): __tablename__ = 't_good' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20), unique=True) count = db.Column(db.Integer) @app.route('/') def purchase(): """购买商品""" # 更新方式2: update子查询 可以避免更新丢失问题 # update t_good set count = count - 1 where name = '方便面'; Goods.query.filter(Goods.name == '方便面').update({'count': Goods.count - 1}) # 提交会话 db.session.commit() return "index" if __name__ == '__main__': # 重置数据库数据 db.drop_all() db.create_all() # 添加一条测试数据 goods = Goods(name='方便面', count=1) db.session.add(goods) db.session.commit() app.run(debug=True)
1. 新查询,再删除
2. 基于过滤条件的删除
@app.route('/del')
def delete():
"""删除数据"""
# 方式1: 先查后删除
goods = Goods.query.filter(Goods.name == '方便面').first()
# 删除数据
db.session.delete(goods)
# 提交会话 增删改都要提交会话
db.session.commit()
return "index"
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # 创建组件对象 db = SQLAlchemy(app) # 构建模型类 商品表 class Goods(db.Model): __tablename__ = 't_good' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20), unique=True) count = db.Column(db.Integer) @app.route('/del') def delete(): """删除数据""" # 方式2: delete子查询 Goods.query.filter(Goods.name == '方便面').delete() # 提交会话 db.session.commit() return "index" if __name__ == '__main__': # 重置数据库数据 db.drop_all() db.create_all() # 添加一条测试数据 goods = Goods(name='方便面', count=1) db.session.add(goods) db.session.commit() app.run(debug=True)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True db = SQLAlchemy(app) # 构建模型类 class Goods(db.Model): __tablename__ = 't_good' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20), unique=True) count = db.Column(db.Integer) @app.route('/') def purchase(): goods = Goods(name='方便面', count=20) db.session.add(goods) # 主动执行flush操作, 立即执行SQL操作(数据库同步) db.session.flush() # Goods.query.count() # 查询操作会自动执行flush操作 db.session.commit() # 提交会话会自动执行flush操作 return "index" if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
1. 数据关联
2. 关联查询
1. 从表模型类中定义外键字段
2. 从表模型对象的外键字段记录主表主键
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = False # 创建组件对象 db = SQLAlchemy(app) # 用户表 主表(一) 一个用户可以有多个地址 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20)) # 地址表 从表(多) class Address(db.Model): __tablename__ = 't_adr' id = db.Column(db.Integer, primary_key=True) detail = db.Column(db.String(20)) user_id = db.Column(db.Integer) # 定义外键 @app.route('/') def index(): """添加并关联数据""" user1 = User(name='张三') db.session.add(user1) db.session.flush() # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败 # db.session.commit() # 有些场景下, 为了保证数据操作的原子性不能分成多个事务进行操作 adr1 = Address(detail='中关村3号', user_id=user1.id) adr2 = Address(detail='华强北5号', user_id=user1.id) db.session.add_all([adr1, adr2]) db.session.commit() return "index" if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
1. 查询主表数据
2. 在通过外键字段查询关联的从表数据
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = False # 创建组件对象 db = SQLAlchemy(app) # 用户表 一 一个用户可以有多个地址 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20)) # 地址表 多 class Address(db.Model): __tablename__ = 't_adr' id = db.Column(db.Integer, primary_key=True) detail = db.Column(db.String(20)) user_id = db.Column(db.Integer) # 定义外键 @app.route('/demo') def demo(): """查询多表数据 需求: 查询姓名为"张三"的所有地址信息""" # 1.先根据姓名查找到主表主键 user1 = User.query.filter_by(name='张三').first() # 2.再根据主键到从表查询关联地址 adrs = Address.query.filter_by(user_id=user1.id).all() for adr in adrs: print(adr.detail) return "demo" @app.route('/') def index(): """添加并关联数据""" user1 = User(name='张三') db.session.add(user1) db.session.flush() adr1 = Address(detail='中关村3号', user_id=user1.id) adr2 = Address(detail='华强北5号', user_id=user1.id) db.session.add_all([adr1, adr2]) db.session.commit() return "index" if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
sqalchemy
装的一套查询关联数据的语法, 其目的为 让开发者使用 面向对象的形式 方便快捷的获取关联数据from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = False # 创建组件对象 db = SQLAlchemy(app) # 用户表 一 一个用户可以有多个地址 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20)) addresses = db.relationship('Address') # 1.定义关系属性 relationship("关联数据所在的模型类") # 地址表 多 class Address(db.Model): __tablename__ = 't_adr' id = db.Column(db.Integer, primary_key=True) detail = db.Column(db.String(20)) # 2. 外键字段设置外键参数 db.ForeignKey('主表名.主键') user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) @app.route('/') def index(): """添加数据""" user1 = User(name='张三') db.session.add(user1) db.session.flush() adr1 = Address(detail='中关村3号', user_id=user1.id) adr2 = Address(detail='华强北5号', user_id=user1.id) db.session.add_all([adr1, adr2]) db.session.commit() """查询多表数据 需求: 查询姓名为"张三"的所有地址信息""" # 先根据姓名查找用户主键 user1 = User.query.filter_by(name='张三').first() # 3.使用关系属性获取关系数据 for address in user1.addresses: print(address.detail) return "index" if __name__ == '__main__': # 重置所有继承自db.Model的表 db.drop_all() db.create_all() app.run(debug=True)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test31' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = False # 创建组件对象 db = SQLAlchemy(app) # 用户表 一 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20)) # 地址表 多 class Address(db.Model): __tablename__ = 't_adr' id = db.Column(db.Integer, primary_key=True) detail = db.Column(db.String(20)) user_id = db.Column(db.Integer) # 定义外键 @app.route('/demo') def demo(): """查询多表数据 需求: 查询姓名为"张三"的用户id和地址信息""" # sqlalchemy的join查询 data = db.session.query(User.id, Address.detail).join(Address, User.id == Address.user_id).filter(User.name == '张三').all() for item in data: print(item.detail, item.id) return "demo" @app.route('/') def index(): """添加数据""" user1 = User(name='张三') db.session.add(user1) db.session.flush() adr1 = Address(detail='中关村3号', user_id=user1.id) adr2 = Address(detail='华强北5号', user_id=user1.id) db.session.add_all([adr1, adr2, user1]) db.session.commit() return 'index' if __name__ == '__main__': db.drop_all() db.create_all() app.run(debug=True)
# 使用join语句优化关联查询
adrs = Address.query.join(User, Address.user_id == User.id).filter(User.name == '张三').all() # 列表中包含地址模型对象
Session.rollback
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/toutiao' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True db = SQLAlchemy(app) # 构建模型类 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column('username', db.String(20), unique=True) age = db.Column(db.Integer, default=0, index=True) @app.route('/') def index(): """事务1""" try: user1 = User(name='zs', age=20) db.session.add(user1) db.session.commit() except BaseException: # 手动回滚 同一个session中, 前一个事务如果失败, 必须手动回滚, 否则无法创建新的事务 db.session.rollback() """事务2""" user1 = User(name='lisi', age=30) db.session.add(user1) db.session.commit() return "index" if __name__ == '__main__': """为了进行测试, 首次运行 建表并添加一条测试数据后, 注释下方代码, 并重新运行测试""" # 重置所有继承自db.Model的表 # db.drop_all() # db.create_all() # 添加一条测试数据 # user1 = User(name='zs', age=20) # db.session.add(user1) # db.session.commit() app.run(debug=True)
# hm_数据迁移.py from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate app = Flask(__name__) # 相关配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test32' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # SQlalchemy组件初始化 db = SQLAlchemy(app) # 迁移组件初始化 Migrate(app, db) # 构建模型类 class User(db.Model): __tablename__ = 't_user' id = db.Column(db.Integer, primary_key=True) name = db.Column('username', db.String(20), unique=True) age= db.Column(db.Integer, default=10, index=True) @app.route('/') def index(): return "index" if __name__ == '__main__': app.run(debug=True)
1. export FLASK_APP=hm_数据迁移.py # 设置环境变量指定启动文件
2. flask db init # 生成迁移文件夹
3. flask db migrate # ⽣成迁移版本, 保存到迁移文件夹中
4. flask db upgrade # 执行迁移
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。