赞
踩
目录
pip install flask-sqlalchemy
pip install Pymysql
不用写SQL语句,底层代码强大(方便更改、维护和迁移)
目前,许多主流的语言,都实现了对象关系映射(
Object Relational Mapper
,简称ORM
)的库包。ORM
的主要功能是将数据库表中的每条记录映射成一个对象。所有的数据库操作,都转化为对象的操作。这样可以增加代码的可读性和安全性。
ORM
优点:
- 简洁易读:将数据表抽象为对象(数据模型),更直观易读。
- 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护。
- 更安全:有效避免
SQL
注入。当然性能上会低于直接执行
SQL
语句,本文介绍SQLAlchemy
的一些基础操作。
<协议名称>://<⽤户名>:<密码>@<ip地址>:<端⼝>/<数据库名>
如果使⽤的是mysqldb驱动,协议名: mysql
如果使⽤的是pymysql驱动,协议名: mysql+pymysql
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- with app.app_context():
- with db.engine.connect() as conn:
- rs=conn.execute(SQLAlchemy().text("select 1"))
- print(rs.fetchone())
-
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
此时输出为(1,)
如图
Interger:整型,映射到数据库中是int类型
Float:浮点类型,float
Double; String; Boolean;
Decimal: 定点类型,专门为解决浮点类型精度丢失的问题而设定。Decimal需要传入两个参数,第一个参数标记该字段能存储多少位数,第二个参数表示小数点后有又多少个小数位。
Enum:枚举类型;
Date:日期类型,年月日;
DateTime: 时间类型,年月日时分毫秒;
Time:时间类型,时分秒;
Text:长字符串,可存储6万多个字符,text;
LongText:长文本类型,longtext.
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # with app.app_context():
- # with db.engine.connect() as conn:
- # rs=conn.execute(SQLAlchemy().text("select 1"))
- # print(rs.fetchone())
-
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
- user1=User(username="coleak",email="666@qq.com")
- user2=User()
- user2.username="ayue"
- user2.email="999@qq.com"
- with app.app_context():
- db.create_all()
-
-
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
效果如图
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
效果如图
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
http://127.0.0.1:5000/user/query?username=coleak
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- # 新增数据
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- # 查询数据
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- # 修改数据
- @app.route("/user/update")
- def update_user():
- username = request.args.get('username')
- newemail=request.args.get('newemail')
- user = User.query.filter_by(username=username).first()
- user.email=newemail
- db.session.commit()
- return "update--successful"
-
-
- # 根路由
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
http://127.0.0.1:5000/user/update?username=coleak&newemail=666888@qq.com
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- # 新增数据
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- # 查询数据
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- # 修改数据
- @app.route("/user/update")
- def update_user():
- username = request.args.get('username')
- newemail=request.args.get('newemail')
- user = User.query.filter_by(username=username).first()
- user.email=newemail
- db.session.commit()
- return "update--successful"
-
- # 删除数据
- @app.route("/user/delete")
- def delete_user():
- username = request.args.get('username')
- user = User.query.filter_by(username=username).first()
- db.session.delete(user)
- db.session.commit()
- return "delete--successful"
-
-
-
- # 根路由
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
http://127.0.0.1:5000/user/delete?username=coleak
此时第一个coleak被删除了
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- class article(db.Model):
- __tablename__="article"
- arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
- artitle=db.Column(db.String(100), nullable=False)
- arcontent=db.Column(db.Text,nullable=False)
- author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
- # 自动给User添加一个属性articles来获取文章列表
- author=db.relationship("User",backref="articles")
-
- with app.app_context():
- db.create_all()
- # 创建user和article
- # coleak=User(username="coleak",email="678@163.com")
- # article=article()
- # article.title="flask_learn"
- # article.content="flask_content"
- # article.id=1
- # article.author_id=coleak.id
-
- # 新增数据
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- # 查询数据
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- # 修改数据
- @app.route("/user/update")
- def update_user():
- username = request.args.get('username')
- newemail=request.args.get('newemail')
- user = User.query.filter_by(username=username).first()
- user.email=newemail
- db.session.commit()
- return "update--successful"
-
- # 删除数据
- @app.route("/user/delete")
- def delete_user():
- username = request.args.get('username')
- user = User.query.filter_by(username=username).first()
- db.session.delete(user)
- db.session.commit()
- return "delete--successful"
-
-
-
- # 根路由
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- class article(db.Model):
- __tablename__="article"
- arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
- artitle=db.Column(db.String(100), nullable=False)
- arcontent=db.Column(db.Text,nullable=False)
- author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
- # 自动给User添加一个属性articles来获取文章列表
- author=db.relationship("User",backref="articles")
-
-
-
- @app.route('/article/add')
- def add_article():
- Article1=article(artitle="flask_learn",arcontent="flask_content")
- Article1.author=User.query.get(2)
- Article2 = article(artitle="flask_learn2", arcontent="flask_content2")
- Article2.author = User.query.get(2)
- db.session.add_all([Article1,Article2])
- db.session.commit()
- return "文章1,2添加成功"
-
- @app.route("/article/query")
- def query():
- user=User.query.get(2)
- for article in user.articles:
- print(article.artitle)
- return "文章查找成功"
-
-
- # 新增数据
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- # 查询数据
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- # 修改数据
- @app.route("/user/update")
- def update_user():
- username = request.args.get('username')
- newemail=request.args.get('newemail')
- user = User.query.filter_by(username=username).first()
- user.email=newemail
- db.session.commit()
- return "update--successful"
-
- # 删除数据
- @app.route("/user/delete")
- def delete_user():
- username = request.args.get('username')
- user = User.query.filter_by(username=username).first()
- db.session.delete(user)
- db.session.commit()
- return "delete--successful"
-
-
-
- # 根路由
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
- from flask import Flask,request
- from flask_sqlalchemy import SQLAlchemy
- from flask_migrate import Migrate
-
- app = Flask(__name__)
-
- # 数据库配置
- HOSTNAME = '127.0.0.1'
- PORT = 3306
- DATABASE = 'flask'
- USERNAME = 'root'
- PASSWORD = '123456'
- app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
- db=SQLAlchemy(app)
-
-
- migrate=Migrate(app,db)
- # flask db init
- # flask db migrate
- # flask db upgrade
-
- # 继承了db.Model
- class User(db.Model):
- __tablename__ = 'user'
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- # varchar(50),唯一约束,不能为空
- username = db.Column(db.String(50), nullable=False)
- passw = db.Column(db.String(50), nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- class article(db.Model):
- __tablename__="article"
- arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
- artitle=db.Column(db.String(100), nullable=False)
- arcontent=db.Column(db.Text,nullable=False)
- author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
- # 自动给User添加一个属性articles来获取文章列表
- author=db.relationship("User",backref="articles")
-
-
-
- @app.route('/article/add')
- def add_article():
- Article1=article(artitle="flask_learn",arcontent="flask_content")
- Article1.author=User.query.get(2)
- Article2 = article(artitle="flask_learn2", arcontent="flask_content2")
- Article2.author = User.query.get(2)
- db.session.add_all([Article1,Article2])
- db.session.commit()
- return "文章1,2添加成功"
-
- @app.route("/article/query")
- def query():
- user=User.query.get(2)
- for article in user.articles:
- print(article.artitle)
- return "文章查找成功"
-
-
- # 新增数据
- @app.route('/uesr/add')
- def add_user():
- user1 = User(username="coleak", email="66666@qq.com")
- user2 = User()
- user2.username = "ayue"
- user2.email = "99999@qq.com"
- db.session.add(user1)
- db.session.add(user2)
- db.session.commit()
- return "用户创建成功"
-
- # 查询数据
- @app.route("/user/query")
- # get查找和filter_by查找
- def query_user():
- # id=request.args.get('id')
- # user=User.query.get(id)
- # # http: // 127.0.0.1: 5000 / user / query?id = 1
- username = request.args.get('username')
- users=User.query.filter_by(username=username)
- for user in users:
- print(f"{user.id}:{user.username}-{user.email}")
- return "数据查找成功!"
-
- # 修改数据
- @app.route("/user/update")
- def update_user():
- username = request.args.get('username')
- newemail=request.args.get('newemail')
- user = User.query.filter_by(username=username).first()
- user.email=newemail
- db.session.commit()
- return "update--successful"
-
- # 删除数据
- @app.route("/user/delete")
- def delete_user():
- username = request.args.get('username')
- user = User.query.filter_by(username=username).first()
- db.session.delete(user)
- db.session.commit()
- return "delete--successful"
-
-
-
- # 根路由
- @app.route('/')
- def hello_world():
- return 'Hello World!'
-
- if __name__ == '__main__':
- app.run()
- migrate=Migrate(app,db)
- # flask db init 只执行一次生成文件目录
- # flask db migrate
- # flask db upgrade
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。