赞
踩
# exts.py:插件管理
# 扩展的第三方插件
# 1.导入第三方插件
from flask_sqlalchemy import SQLAlchemy # ORM插件
from flask_migrate import Migrate
# 2. 初始化
db = SQLAlchemy() # ORM
migrate = Migrate() # 数据迁移
# 3. 和app对象绑定
def init_exts(app):
db.init_app(app=app)
migrate.init_app(app=app, db=db)
# __init__.py :初始化文件,创建Flask应用 from flask import Flask from .views import blue from .exts import init_exts def create_app(): app = Flask(__name__) # 注册蓝图 app.register_blueprint(blueprint=blue) # 配置数据库 db_uri = 'sqlite:///sqlite3.db' # db_uri = 'mysql+pymysql://root:123456@localhost:3306/flaskdb' # mysql的配置 app.config['SQLALCHEMY_DATABASE_URI'] = db_uri app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 禁止对象追踪修改 # 初始化插件 init_exts(app=app) return app
# models.py : 模型,数据库 from .exts import db # 模型 数据库 # 类 ==> 表结构 # 类属性 ==> 表字段 # 一个对象 ==> 表的一行数据 # 模型Model:类 # 必须继承 db.Model class User(db.Model): # 表名 __tablename__ = 'tb_user' # 定义表字段 id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(30), unique=True, index=True) age = db.Column(db.Integer, default=1) sex = db.Column(db.Boolean, default=True) salary = db.Column(db.Float, default=100000, nullable=False) salary2 = db.Column(db.Float, default=100000, nullable=False) # db.Column : 表示字段 # db.Integer:表示整数 # primary_key=True : 主键 # autoincrement=True : 自动递增 # db.String(30): varchar(30) 可变字符串 # unique=True : 唯一约束 # index=True : 普通索引 # default=1 : 默认值 # nullable=False : 是否允许为空
# model.py
from .exts import db
# 类 => 表
# 类属性 => 表字段
# 对象 => 表的一条数据
class User(db.Model):
__tablename__ = 'user' # 表名
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(30), unique=True)
age = db.Column(db.Integer, default=1)
def __repr__(self):
return self.name
from flask import Blueprint, request, render_template from sqlalchemy import desc, and_, not_, or_ from .models import * # 蓝图 blue = Blueprint('user', __name__) @blue.route('/') def index(): return 'index' # 单表操作: # 增删改查 # 增:添加数据 @blue.route('/useradd/') def user_add(): # 添加一条数据 # u = User() # u.name = 'kun' # u.age = 24 # db.session.add(u) # 将u对象添加到session中 # db.session.commit() # 同步到数据库中 # 同时添加多条数据 users = [] for i in range(10, 30): u = User() u.name = '冰冰' + str(i) u.age = i users.append(u) try: db.session.add_all(users) db.session.commit() # 事务提交 except Exception as e: db.session.rollback() # 回滚 db.session.flush() return 'fail: ' + str(e) return 'success!' # 删:删除数据 # 找到要删除的数据,然后删除 @blue.route('/userdel/') def user_del(): u = User.query.first() # 查询第一条数据 db.session.delete(u) db.session.commit( return 'success!' # 改:修改数据 # 找到要修改的数据,然后修改 @blue.route('/userupdate/') def user_update(): u = User.query.first() # 查询第一条数据 u.age = 1000 db.session.commit() return 'success!' # 查:查询数据 # 条件 @blue.route('/userget/') def user_get(): # all(): 返回所有数据,返回列表 users = User.query.all() # print(users, type(users)) # <class 'list'> # print(User.query, type(User.query)) # <class 'flask_sqlalchemy.query.Query'> # filter() : 过滤,得到查询集,类似SQL中的where users = User.query.filter() # print(users, type(users)) # 查询集 # print(list(users)) # get():查询到对应主键的数据对象 user = User.query.get(8) # print(user, type(user)) # User对象 <class 'App.models.User'> # print(user.name, user.age) # 获取数据的属性 # filter() : 类似SQL中的where # filter_by() : 用于等值操作的过滤 # users = User.query.filter(User.age==20) # users = User.query.filter_by(age=20) users = User.query.filter(User.age>20) # 可以用于非等值操作 # print(list(users)) # [冰冰20] # first() : 第一条数据 # first_or_404(): 第一条数据,如果不存在则抛出404错误 user = User.query.first() # user = User.query.filter_by(age=100).first_or_404() # print(user) # count(): 统计查询集中的数据条数 users = User.query.filter() # print(users.count()) # 20 # limit() : 前几条 # offset() : 跳过前几条 users = User.query.offset(3).limit(4) # print(list(users)) # order_by() : 排序 users = User.query.order_by('age') # 升序 users = User.query.order_by(desc('age')) # 降序 # print(list(users)) # 逻辑运算:and_,or_,not_ users = User.query.filter(User.age>20, User.age<25) # 且,常用 users = User.query.filter(and_(User.age>20, User.age<25)) # 且 users = User.query.filter(or_(User.age>25, User.age<20)) # 或 users = User.query.filter(not_(or_(User.age>25, User.age<20))) # 非 # print(list(users)) # 查询属性 # contains('3'): 模糊查找,类似SQL中的like users = User.query.filter(User.name.contains('3')) # in_(): 其中之一 users = User.query.filter(User.age.in_([10, 20, 30, 40, 50])) # startswith() : 以某子串开头 # endswith() : 以某子串结尾 users = User.query.filter(User.name.startswith('冰')) # users = User.query.filter(User.name.endswith('2')) # print(list(users)) # __gt__: 大于 users = User.query.filter(User.age.__gt__(25)) print(list(users)) return 'success' # 分页,翻页 # 1.手动翻页 # offset().limit() # 数据: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20 # 页码:page=1 # 每页显示数量:per_page=5 # page=1 : 1,2,3,4,5 => offset(0).limit(5) # page=2 : 6,7,8,9,10 => offset(5).limit(5) # page=3 : 11,12,13,14,15 => offset(10).limit(5) # page=4 : 16,17,18,19,20 => offset(15).limit(5) # ... .... # page=n : => offset((page-1)*per_page).limit(per_page) # 2.paginate对象 @blue.route('/paginate/') def get_paginate(): # 页码:默认显示第一页 page = int(request.args.get('page', 1)) # per_page: 每页显示数据量 per_page = int(request.args.get('per_page', 5)) # print(page, type(page)) # print(per_page, type(per_page)) # paginate() p = User.query.paginate(page=page, per_page=per_page, error_out=False) # paginate对象的属性: # items:返回当前页的内容列表 print(p.items) # has_next:是否还有下一页 # print(p.has_next) # has_prev:是否还有上一页 # print(p.has_prev) # next(error_out=False):返回下一页的Pagination对象 # print(p.next(error_out=False).items) # prev(error_out=False):返回上一页的Pagination对象 # print(p.prev(error_out=False).items) # page:当前页的页码(从1开始) print(p.page) # pages:总页数 print(p.pages) # per_page:每页显示的数量 # print(p.per_page) # prev_num:上一页页码数 # print(p.prev_num) # next_num:下一页页码数 # print(p.next_num) # total:查询返回的记录总数 print(p.total) return render_template('paginate.html', p=p)
# models.py # 多表关系 # ----------------------- 一对多 ----------------------- # # 班级:学生 = 1:N # 班级表 class Grade(db.Model): __tablename__ = 'grade' # 表名 id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(30), unique=True) # 建立关联 # 第1个参数:关联的模型名(表) # 第2个参数:反向引用的名称,grade对象, # 让student去反过来得到grade对象的名称: student.grade # 第3个参数:懒加载 # 这里的students不是字段 students = db.relationship('Student', backref='grade', lazy=True) # 学生表 class Student(db.Model): __tablename__ = 'student' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(30), unique=True) age = db.Column(db.Integer) # 外键:跟Grade表中的id字段关联 gradeid = db.Column(db.Integer, db.ForeignKey(Grade.id))
# 查询 @blue.route('/getstu/') def get_stu(): # 查询某学生所在的班级: 反向引用grade stu = Student.query.get(2) # print(stu.name, stu.age) # print(stu.gradeid, stu.grade, stu.grade.name, stu.grade.id) # 查找某班级下的所有学生 grade = Grade.query.get(32) print(grade.name) print(grade.students) # 所有学生 for stu in grade.students: print(stu.name, stu.age) return 'OK'
# models.py # 用户收藏电影 # 用户 : 电影 = N : M # 中间表:收藏表 collect = db.Table( 'collects', db.Column('user_id', db.Integer, db.ForeignKey('usermodel.id'), primary_key=True), db.Column('movie_id', db.Integer, db.ForeignKey('movie.id'), primary_key=True) ) # 用户表 class UserModel(db.Model): __tablename__ = 'usermodel' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(30)) age = db.Column(db.Integer) # 电影表 class Movie(db.Model): __tablename__ = 'movie' id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.String(30)) # 关联 # secondary=collect: 设置中间表 # users = db.relationship('UserModel', backref='movies', lazy=True, secondary=collect) users = db.relationship('UserModel', backref='movies', lazy='dynamic', secondary=collect) # lazy属性: # 懒加载,可以延迟在使用关联属性的时候才建立关联 # lazy='dynamic': 会返回一个query对象(查询集),可以继续使用其他查询方法,如all(). # lazy='select': 首次访问到属性的时候,就会全部加载该属性的数据. # lazy='joined': 在对关联的两个表进行join操作,从而获取到所有相关的对象 # lazy=True: 返回一个可用的列表对象,同select
# 添加数据 @blue.route('/adduser/') def add_user(): # 添加用户 users = [] for i in range(10, 14): user = UserModel() user.name = 'Lucy-{}'.format(i) user.age = i users.append(user) try: db.session.add_all(users) db.session.commit() except Exception as e: print('e:', e) db.session.rollback() db.session.flush() return 'OK' @blue.route('/addmovie/') def add_movie(): # 添加电影 movies = [] for i in range(10, 14): moive = Movie() moive.name = '阿凡达-{}'.format(i) movies.append(moive) try: db.session.add_all(movies) db.session.commit() except Exception as e: print('e:', e) db.session.rollback() db.session.flush() return 'OK' @blue.route('/addcollect/') def add_collect(): # 用户收藏电影 user = UserModel.query.get(1) movie = Movie.query.get(1) user.movies.append(movie) db.session.commit() return 'OK' # 查询 @blue.route('/getcollect/') def get_collect(): # 查找某用户收藏的所有电影 user = UserModel.query.get(1) print(user.movies) # 查找收藏了某电影的所有用户 movie = Movie.query.get(4) print(movie.users) print(list(movie.users)) return 'OK' # 修改:和单表操作 # 删除 @blue.route('/deluser/') def del_user(): # 级联删除 user = UserModel.query.get(1) db.session.delete(user) db.session.commit() return 'OK'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。