当前位置:   article > 正文

【小沐学Python】Python实现Web服务器(Flask框架扩展:Flask-SQLAlchemy)

【小沐学Python】Python实现Web服务器(Flask框架扩展:Flask-SQLAlchemy)

在这里插入图片描述

1、简介

  • SQLAlchemy

SQLALchemy 实际上是对数据库的抽象,让开发者不用直接和 SQL 语句打交道,而是通过 Python 对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升。

SQLAlchemy 是 Python SQL 工具包和对象关系映射器,它为应用程序开发人员提供了 SQL 的全部功能和灵活性。它提供了一整套众所周知的企业级持久性模式,专为高效和高性能的数据库访问而设计,并适应为简单的 Python 域语言。

  • Flask-SQLAlchemy

SQLAlchemy是一个关系型数据库框架,它提供了高层的 ORM 和底层的原生数据库的操作。flask-sqlalchemy 是一个简化了 SQLAlchemy 操作的flask扩展。

Flask-SQLALchemy 是一个给你的应用添加 SQLALchemy 支持的 Flask 扩展。 它需要 SQLAlchemy 0.6 或更高的版本。它致力于简化在 Flask 中 SQLAlchemy 的 使用,提供了有用的默认值和额外的助手来更简单地完成日常任务。

常见情况下对于只有一个 Flask 应用,所有您需要做的事情就是创建 Flask 应用,选择加载配置接着创建 SQLAlchemy 对象时候把 Flask 应用传递给它作为参数。一旦创建,这个对象就包含 sqlalchemy 和 sqlalchemy.orm 中的所有函数和助手。此外它还提供一个名为 Model 的类,用于作为声明模型时的 delarative 基类:

from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:tmp/test.db'
db = SQLAlchemy(app)


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(120), unique=True)

    def __init__(self, username, email):
        self.username = username
        self.email = email

    def __repr__(self):
        return '<User %r>' % self.username
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2、安装

pip install flask-sqlalchemy
  • 1

3、开发

3.1 数据库连接字符串

常用数据库连接字符串如下:

  • mysql
# default
engine = create_engine("mysql://scott:tiger@localhost/foo")

# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")

# PyMySQL
engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • oracle
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")

engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")
  • 1
  • 2
  • 3
  • Microsoft SQL Server
# pyodbc
engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")

# pymssql
engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")
  • 1
  • 2
  • 3
  • 4
  • 5
  • SQLite
# sqlite://<nohostname>/<path>
# where <path> is relative:
engine = create_engine("sqlite:///foo.db")

# Unix/Mac - 4 initial slashes in total
engine = create_engine("sqlite:absolute/path/to/foo.db")

# Windows
engine = create_engine("sqlite:///C:\\path\\to\\foo.db")

# Windows alternative using raw string
engine = create_engine(r"sqlite:///C:\path\to\foo.db")

# To use a SQLite :memory: database, specify an empty URL:
engine = create_engine("sqlite://")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • PostgreSQL
# default
engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")

# psycopg2
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")

# pg8000
engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.2 SQLAlchemy参数设置

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
#查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

常用参数设置如下:

名字备注
SQLALCHEMY_DATABASE_URI用于连接的数据库 URI 。例如:sqlite:tmp/test.dbmysql://username:password@server/db
SQLALCHEMY_BINDS一个映射 binds 到连接 URI 的字典。更多 binds 的信息见用 Binds 操作多个数据库。
SQLALCHEMY_ECHO如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句)
SQLALCHEMY_RECORD_QUERIES可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。
SQLALCHEMY_NATIVE_UNICODE可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。
SQLALCHEMY_POOL_SIZE数据库连接池的大小。默认是引擎默认值(通常 是 5 )
SQLALCHEMY_POOL_TIMEOUT设定连接池的连接超时时间。默认是 10 。
SQLALCHEMY_POOL_RECYCLE多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。

3.3 SQLAlchemy字段类型

常用字段类型如下:

类型名python中类型说明
Integerint普通整数,一般是32位
SmallIntegerint取值范围小的整数,一般是16位
BigIntegerint或long不限制精度的整数
Floatfloat浮点数
Numericdecimal.Decimal普通整数,一般是32位
Stringstr变长字符串
Textstr变长字符串,对较长或不限长度的字符串做了优化
Unicodeunicode变长Unicode字符串
UnicodeTextunicode变长Unicode字符串,对较长或不限长度的字符串做了优化
Booleanbool布尔值
Datedatetime.date时间
Timedatetime.datetime日期和时间
LargeBinarystr二进制文件

3.4 SQLAlchemy列选项

常用的SQLAlchemy列选项

选项名说明
primary_key如果为True,代表表的主键
unique如果为True,代表这列不允许出现重复的值
index如果为True,为这列创建索引,提高查询效率
nullable如果为True,允许有空值,如果为False,不允许有空值
default为这列定义默认值

3.5 SQLAlchemy关系选项

选项名说明
backref在关系的另一模型中添加反向引用
primary join明确指定两个模型之间使用的联结条件
uselist如果为False,不使用列表,而使用标量值
order_by指定关系中记录的排序方式
secondary指定多对多中记录的排序方式
secondary join在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

3.6 SQLAlchemy操作接口

(1)声明一个模型

  • 简单例子
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(120), unique=True)
    age = db.Column(db.Integer)
    
    def __init__(self, username, email):
        self.username = username
        self.email = email

    def __repr__(self):
        return '<User %r>' % self.username
        # return "(%s, %s, %s, %s)" % (self.id, self.name, self.email, self.age)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 一对多关系
class Person(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50))
    addresses = db.relationship('Address', backref='person',
                                lazy='dynamic')

class Address(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(50))
    person_id = db.Column(db.Integer, db.ForeignKey('person.id'))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 多对多关系
tags = db.Table('tags',
    db.Column('tag_id', db.Integer, db.ForeignKey('tag.id')),
    db.Column('page_id', db.Integer, db.ForeignKey('page.id'))
)

class Page(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    tags = db.relationship('Tag', secondary=tags,
        backref=db.backref('pages', lazy='dynamic'))

class Tag(db.Model):
    id = db.Column(db.Integer, primary_key=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

(2)创建数据库
SQLAlchemy.create_all() 和 SQLAlchemy.drop_all(),根据模型用来创建以及删除表格的方法。

db.drop_all()
db.create_all()
  • 1
  • 2

(3)执行CRUD操作
SQLAlchemy的Session对象管理ORM对象的所有持久性操作。
session方法执行CRUD操作:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@127.0.0.1:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

db = SQLAlchemy(app)

# 模型类
class User(db.Model):
    __tablename__ = "user"
	#############
	
@app.route("/add")
def add():
	#############
	
@app.route("/delete")
def delete():
	#############
	
@app.route("/update")
def update():
	#############
	
@app.route("/query")	
def query():
	#############
	
if __name__ == '__main__':
    db.drop_all()  # 删除所有表,为方便测试,实际勿用
    db.create_all()  # 创建所有表
    app.run(debug=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 增加记录
from yourapp import User
me = User('admin', 'admin@example.com', 17)
db.session.add(me)
db.session.commit()

# 在flask视图中增加
@app.route("/add")
def add():
    user1 = User(username='wang', email='wang@163.com', age=20)
    user2 = User(username='zhang', email='zhang@189.com', age=33)
    user3 = User(username='chen', email='chen@126.com', age=23)
    user4 = User(username='zhou', email='zhou@163.com', age=29)
    user5 = User(username='tang', email='tang@itheima.com', age=25)
    user6 = User(username='wu', email='wu@gmail.com', age=25)
    user7 = User(username='qian', email='qian@gmail.com', age=23)
    user8 = User(username='liu', email='liu@itheima.com', age=30)
    user9 = User(username='li', email='li@163.com', age=28)
    user10 = User(username='sun', email='sun@163.com', age=26)
    db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
    db.session.commit()
    return "add success"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 删除记录
db.session.delete(me)
db.session.commit()

@app.route("/delete")
def delete():
    # 方案1:
    user = User.query.filter(User.username== "tomcat").first()
    db.session.delete(user )
    db.session.commit()

    # 方案2(推荐):
    User.query.filter(User.username == "li").delete()
    db.session.commit()

    return "delete success"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 更新记录
# 在flask视图中查询
@app.route("/update")
def update():
    # 方案1:先查询模型对象,修改对象属性,再提交到数据库
    user = User.query.filter(User.username== "tomcat").first()
    user.count -= 1
    db.session.add(user )  # 可以省略这步
    db.session.commit()

    # 方案2(推荐):
    User.query.filter(User.username== "li").update({"age": User.age - 1})
    db.session.commit()
    return "update success"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 查询记录
    Flask-SQLAlchemy 在你的 Model 类上提供了一个 query 属性。
    当你访问它时,你会得到一个新的针对所有记录的查询对象。
    你可以使用:sqlalchemy.orm.query.Query.all 或:sqlalchemy.orm.query.Query.first。
    你可以使用:sqlalchemy.orm.query.Query.filter 的方法来过滤记录。
    如果你想要用主键查询,你也可以使用:sqlalchemy.orm.query.Query.get 。
# 通过用户名查用户,对不存在的用户名返回 None。
peter = User.query.filter_by(username='peter').first()
print(peter.id)
print(peter.email)

# 以更复杂的表达式选取一些用户:
User.query.filter(User.email.endswith('@example.com')).all()

# 以某种规则对用户排序:
User.query.order_by(User.username)

# 限制返回的用户数目:
User.query.limit(1).all()

# 用主键获取用户:
User.query.get(1)

# 在flask视图中查询
@app.route('/user/<username>')
def show_user(username):
    user = User.query.filter_by(username=username).first_or_404()
    return render_template('show_user.html', user=user)

@app.route("/query")
def query():
    # 简单查询
    User.query.all()  # 查询所有用户数据
    User.query.count()  # 查询有多少个用户
    User.query.first()  # 查询第1个用户

    # 根据id查询  返回模型对象/None
    User.query.get(5)
    User.query.filter_by(id=5).all()
    User.query.filter(User.id == 5).first()

    # 查询名字以某个字符开始/结尾/包含的所有用户
    User.query.filter(User.username.endswith("g")).all()
    User.query.filter(User.username.startswith("w")).all()
    User.query.filter(User.username.contains("n")).all()
    User.query.filter(User.username.like("w%n%g")).all()  # 模糊查询

    # 查询名字不等于wang的所有用户
    from sqlalchemy import not_, and_, or_
    User.query.filter(not_(User.username== "li")).all()
    User.query.filter(User.username!= "li").all()

    User.query.filter(User.id.in_([1, 5, 6])).all()  # 查询id在某个范围的用户
    # 排序查询
    User.query.order_by(User.age, User.id.desc()).limit(5).all()  # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
    # 分页查询
    pn = User.query.paginate(3, 3)
    # pn.pages 总页数  pn.page 当前页码 pn.items 当前页的数据  pn.total 总条数

	# 去重
	db.query(distinct(User.username)).all()
	db.query(User.username,User.age).distinct(User.username, User.age).filter(xxx).all()
	
    # 聚合查询
    # 查询每个年龄的人数    select age, count(username) from t_user group by age  分组聚合
    from sqlalchemy import func
    data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
    for item in data:
        # print(item[0], item[1])
        print(item.age, item.count)

    # 只查询所有人的姓名和邮箱  优化查询   User.query.all()  # 相当于select *
    from sqlalchemy.orm import load_only
    data = User.query.options(load_only(User.username, User.email)).all()
    # 另一种写法: data = session.query(User).with_entities(User.id).filter(xxx).all()
    for item in data:
        print(item.username, item.email)

    data = db.session.query(User.username, User.email).all()
    for item in data:
        print(item.username, item.email)

    return "query success"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

4、代码测试

http://www.pythondoc.com/flask-sqlalchemy/quickstart.html
https://flask.net.cn/patterns/sqlalchemy.html

4.1 用户管理

  • app.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite3'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)

    def __repr__(self):
        return '<User %r>' % self.username
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • list.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<table border="1">
    <tr>
        <th>编号</th>
        <th>用户名</th>
        <th>邮箱</th>
        <th>操作</th>
    </tr>
    {% for u in users %}
        <tr>
            <td>{{ u.id }}</td>
            <td>{{ u.username }}</td>
            <td>{{ u.email }}</td>
            <td>
                <a href="delete/{{ u.id }}">删除</a>
            </td>
        </tr>
    {% endfor %}
</table>
</body>
</html>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 实现查询操作
from initdb import db, app, User
from flask import render_template


@app.route("/")
def find_all_users():
    users = User.query.all()
    print(users)
    return render_template("list.html", users=users)


if __name__ == '__main__':
    app.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 执行查询单个用户的操作
@app.route("/get/<int:get_id>")
def get_by_id(get_id):
    get_user = User.query.get(get_id)  # User.query.filter_by(id=get_id).first()
    return "编号:{0},用戶名:{1},邮箱:{2}".format(get_user.id, get_user.username, get_user.email)


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 执行添加操作
@app.route("/add/<username>")
def add_user(username):
    new_user = User()
    new_user.username = username
    new_user.email = username + "@qq.com"
    db.session.add(new_user)
    db.session.commit()
    return redirect("/")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 执行删除操作
@app.route("/delete/<int:del_id>")
def delete_by_id(del_id):
    del_user = User.query.filter_by(id=del_id).first()
    if del_user is not None:
        db.session.delete(del_user)
        db.session.commit()
    return redirect("/")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

执行修改操作

@app.route("/update", methods=["POST"])
def update_user():
    if request.form["id"]:
        update_id = request.form["id"]
        update_user = User.query.get(update_id)
        update_user.username = request.form["username"]
        update_user.email = request.form["email"]
        db.session.commit()
    return redirect("/")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.2 用户角色管理

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import pymysql
pymysql.install_as_MySQLdb()

app = Flask(__name__)

class Config(object):
    """配置参数"""
    # 设置连接数据库的URL
    user = 'root'
    password = '123456'
    database = 'flask_users'
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://%s:%s@127.0.0.1:3306/%s' % (user,password,database)

    # 设置sqlalchemy自动更跟踪数据库
    SQLALCHEMY_TRACK_MODIFICATIONS = True

    # 查询时会显示原始SQL语句
    app.config['SQLALCHEMY_ECHO'] = True

    # 禁止自动提交数据处理
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = False

# 读取配置
app.config.from_object(Config)

# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)

class Role(db.Model):
    # 定义表名
    __tablename__ = 'roles'
    # 定义字段
    id = db.Column(db.Integer, primary_key=True,autoincrement=True)
    name = db.Column(db.String(64), unique=True)
    users = db.relationship('User',backref='role') # 反推与role关联的多个User模型对象

class User(db.Model):
    # 定义表名
    __tablename__ = 'users'
    # 定义字段
    id = db.Column(db.Integer, primary_key=True,autoincrement=True)
    name = db.Column(db.String(64), unique=True, index=True)
    email = db.Column(db.String(64),unique=True)
    pswd = db.Column(db.String(64))
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # 设置外键

if __name__ == '__main__':

    # 删除所有表
    db.drop_all()

    # 创建所有表
    db.create_all()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 添加初始数据
# 插入一条角色数据
role1 = Role(name='admin')
db.session.add(role1)
db.session.commit()

# 再次插入一条数据
role2 = Role(name='user')
db.session.add(role2)
db.session.commit()

# 一次性插入多条数据
user1 = User(name='wang',email='wang@163.com',pswd='111',role_id=role1.id)
user2 = User(name='zhang',email='zhang@189.com',pswd='222',role_id=role2.id)
user3 = User(name='chen',email='chen@126.com',pswd='333',role_id=role2.id)
user4 = User(name='zhou',email='zhou@163.com',pswd='444',role_id=role1.id)
db.session.add_all([user1,user2,user3,user4])
db.session.commit()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 返回名字等于wang的所有user
User.query.filter_by(name='wang').all()
  • 1
  • 返回查询到的第一个对象
User.query.first()
  • 1
  • 返回查询到的所有对象
User.query.all()
  • 1
  • 返回名字结尾字符为g的所有数据
User.query.filter(User.name.endswith('g')).all()
  • 1
  • 返回名字不等于wang的所有数据
User.query.filter(User.name!='wang').all()
  • 1
  • 返回and*()条件满足的所有数据
from sqlalchemy import and_
User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()
  • 1
  • 2
  • 逻辑或,需要导入or_
from sqlalchemy import or_
User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
for user in users:
	print(user.name, user.email)
  • 1
  • 2
  • 3
  • 4
  • not_ 相当于取反
from sqlalchemy import not_
User.query.filter(not_(User.name=='chen')).all()
  • 1
  • 2
  • 删除数据
user = User.query.first()
db.session.delete(user)
db.session.commit()
  • 1
  • 2
  • 3
  • 更新数据
user = User.query.first()
user.name = 'li'
db.session.commit()

# or
User.query.filter_by(name='zhang').update({'name':'li'})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 关联查询角色的所有用户
#查询roles表id为1的角色
role1 = Role.query.get(1)
#查询该角色的所有用户
role1.users
  • 1
  • 2
  • 3
  • 4
  • 关联查询用户所属角色
#查询users表id为3的用户
user1 = User.query.get(3)
#查询用户属于什么角色
user1.role
  • 1
  • 2
  • 3
  • 4

4.3 学生管理

  • 导入SQLAlchemy类。
from flask_sqlalchemy import SQLAlchemy
  • 1
  • 现在创建一个Flask应用程序对象并为要使用的数据库设置URI。
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///students.sqlite3'
  • 1
  • 2
  • 创建了students 模型
db = SQLAlchemy(app)
class Students(db.Model):
    id = db.Column('student_id', db.Integer, primary_key = True)
    name = db.Column(db.String(100))
    city = db.Column(db.String(50))  
    addr = db.Column(db.String(200))
    pin = db.Column(db.String(10))

    def __init__(self, name, city, addr, pin):
        self.name = name
        self.city = city
        self.addr = addr
        self.pin = pin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 创建数据库
db.create_all()
  • 1
  • 添加主路由接口
@app.route('/')
def show_all():
   return render_template('show_all.html', students = students.query.all() )
  • 1
  • 2
  • 3
  • 添加主路由接口对应的网页模板: show_all.html
<!DOCTYPE html>
<html lang = "en">
   <head></head>
   <body>
      
      <h3>
         <a href = "{{ url_for('show_all') }}">Comments - Flask 
            SQLAlchemy example</a>
      </h3>
      
      <hr/>
      {%- for message in get_flashed_messages() %}
         {{ message }}
      {%- endfor %}
		
      <h3>Students (<a href = "{{ url_for('new') }}">Add Student
         </a>)</h3>
      
      <table>
         <thead>
            <tr>
               <th>Name</th>
               <th>City</th>
               <th>Address</th>
               <th>Pin</th>
            </tr>
         </thead>
         
         <tbody>
            {% for student in students %}
               <tr>
                  <td>{{ student.name }}</td>
                  <td>{{ student.city }}</td>
                  <td>{{ student.addr }}</td>
                  <td>{{ student.pin }}</td>
               </tr>
            {% endfor %}
         </tbody>
      </table>
      
   </body>
</html>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 添加新建路由接口
@app.route('/new', methods = ['GET', 'POST'])
def new():
   if request.method == 'POST':
      if not request.form['name'] or not request.form['city'] or not request.form['addr']:
         flash('Please enter all the fields', 'error')
      else:
         student = students(request.form['name'], request.form['city'],
            request.form['addr'], request.form['pin'])
         
         db.session.add(student)
         db.session.commit()
         
         flash('Record was successfully added')
         return redirect(url_for('show_all'))
   return render_template('new.html')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 添加新建路由接口对应的网页模板: new.html
<!DOCTYPE html>
<html>
   <body>
   
      <h3>Students - Flask SQLAlchemy example</h3>
      <hr/>
      
      {%- for category, message in get_flashed_messages(with_categories = true) %}
         <div class = "alert alert-danger">
            {{ message }}
         </div>
      {%- endfor %}
      
      <form action = "{{ request.path }}" method = "post">
         <label for = "name">Name</label><br>
         <input type = "text" name = "name" placeholder = "Name" /><br>
         <label for = "city">City</label><br>
         <input type = "text" name = "city" placeholder = "city" /><br>
         <label for = "addr">addr</label><br>
         <textarea name = "addr" placeholder = "addr"></textarea><br>
         <label for = "pin">City</label><br>
         <input type = "text" name = "pin" placeholder = "pin" /><br>
         <input type = "submit" value = "Submit" />
      </form>
      
   </body>
</html>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 完整的测试程序文件app.py代码如下:
from flask import Flask, request, flash, url_for, redirect, render_template
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///students.sqlite3'
app.config['SECRET_KEY'] = "random string"

db = SQLAlchemy(app)

class students(db.Model):
   id = db.Column('student_id', db.Integer, primary_key = True)
   name = db.Column(db.String(100))
   city = db.Column(db.String(50))
   addr = db.Column(db.String(200)) 
   pin = db.Column(db.String(10))

def __init__(self, name, city, addr,pin):
   self.name = name
   self.city = city
   self.addr = addr
   self.pin = pin

@app.route('/')
def show_all():
   return render_template('show_all.html', students = students.query.all() )

@app.route('/new', methods = ['GET', 'POST'])
def new():
   if request.method == 'POST':
      if not request.form['name'] or not request.form['city'] or not request.form['addr']:
         flash('Please enter all the fields', 'error')
      else:
         student = students(request.form['name'], request.form['city'],
            request.form['addr'], request.form['pin'])
         
         db.session.add(student)
         db.session.commit()
         flash('Record was successfully added')
         return redirect(url_for('show_all'))
   return render_template('new.html')

if __name__ == '__main__':
   db.create_all()
   app.run(debug = True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

4.4 图书管理

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import config
from datetime import datetime

app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)

class Book(db.Model):
    __tablename__ = 'book'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    title = db.Column(db.String(50), nullable=False)
    publishing_office = db.Column(db.String(100), nullable=False)
    price = db.Column(db.String(30), nullable=False)
    isbn = db.Column(db.String(50), nullable=False)
    storage_time = db.Column(db.DateTime, default=datetime.now)
db.create_all()

#添加数据的路由
@app.route('/add')
def add():
    book1 = Book(title="Python基础教程(第3版)", publishing_office="人民邮电出版社", price="68.30", isbn="9787115474889")
    book2 = Book(title="Python游戏编程快速上手 第4版", publishing_office="人民邮电出版社", price="54.50", isbn="9787115466419")
    book3 = Book(title="JSP应用开发", publishing_office="清华大学出版社", price="68.30", isbn="9787302384496")
    db.session.add(book1)
    db.session.add(book2)
    db.session.add(book3)
    db.session.commit()
    return "添加数据成功!"

@app.route('/')
def index():
    return "Hello!"

@app.route('/select')
def select():
    result = Book.query.filter(Book.id=="1").first()
    print(result.title)
    return "查询数据成功!"

@app.route('/edit')
def edit():
    book1 = Book.query.filter(Book.id=="1").first()
    book1.price = 168
    db.session.commit()
    return "修改数据成功!"

@app.route('/delete')
def delete():
    book1 = Book.query.filter(Book.id=="2").first()
    db.session.delete(book1)
    db.session.commit()
    return "删除数据成功!"

if __name__ == '__main__':
    app.run(debug=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

结语

如果您觉得该方法或代码有一点点用处,可以给作者点个赞,或打赏杯咖啡;╮( ̄▽ ̄)╭
如果您感觉方法或代码不咋地//(ㄒoㄒ)//,就在评论处留言,作者继续改进;o_O???
如果您需要相关功能的代码定制化开发,可以留言私信作者;(✿◡‿◡)
感谢各位大佬童鞋们的支持!( ´ ▽´ )ノ ( ´ ▽´)っ!!!

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/185773
推荐阅读
相关标签
  

闽ICP备14008679号