当前位置:   article > 正文

【flask-sqlalchemy】SQLAlchemy+PyMysql到mysql的映射_sqlalchemy mysql

sqlalchemy mysql

目录

简介

第三方包

优势

ORM

数据库的连接配置

ROM模型

数据类型

新建表

增加数据

查找数据

修改数据

删除数据

外键与表的关系

migrate迁移


简介

第三方包

pip install flask-sqlalchemy

pip install Pymysql

优势

不用写SQL语句,底层代码强大(方便更改、维护和迁移)

ORM

目前,许多主流的语言,都实现了对象关系映射(Object Relational Mapper,简称ORM)的库包。ORM的主要功能是将数据库表中的每条记录映射成一个对象。所有的数据库操作,都转化为对象的操作。这样可以增加代码的可读性和安全性。

ORM优点:

  1. 简洁易读:将数据表抽象为对象(数据模型),更直观易读。
  2. 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护。
  3. 更安全:有效避免SQL注入。

当然性能上会低于直接执行SQL语句,本文介绍SQLAlchemy的一些基础操作。

数据库的连接配置

<协议名称>://<⽤户名>:<密码>@<ip地址>:<端⼝>/<数据库名>

如果使⽤的是mysqldb驱动,协议名: mysql
如果使⽤的是pymysql驱动,协议名: mysql+pymysql

  1. from flask import Flask
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. with app.app_context():
  13. with db.engine.connect() as conn:
  14. rs=conn.execute(SQLAlchemy().text("select 1"))
  15. print(rs.fetchone())
  16. @app.route('/')
  17. def hello_world():
  18. return 'Hello World!'
  19. if __name__ == '__main__':
  20. app.run()

此时输出为(1,) 

如图 

ROM模型

数据类型

​ Interger:整型,映射到数据库中是int类型

​ Float:浮点类型,float

​ Double; String; Boolean;

​ Decimal: 定点类型,专门为解决浮点类型精度丢失的问题而设定。Decimal需要传入两个参数,第一个参数标记该字段能存储多少位数,第二个参数表示小数点后有又多少个小数位。

​ Enum:枚举类型;

​ Date:日期类型,年月日;

​ DateTime: 时间类型,年月日时分毫秒;

​ Time:时间类型,时分秒;

​ Text:长字符串,可存储6万多个字符,text;

​ LongText:长文本类型,longtext.

新建表

  1. from flask import Flask
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # with app.app_context():
  13. # with db.engine.connect() as conn:
  14. # rs=conn.execute(SQLAlchemy().text("select 1"))
  15. # print(rs.fetchone())
  16. class User(db.Model):
  17. __tablename__ = 'user'
  18. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  19. # varchar(50),唯一约束,不能为空
  20. username = db.Column(db.String(50), nullable=False)
  21. email = db.Column(db.String(120), unique=True, nullable=False)
  22. user1=User(username="coleak",email="666@qq.com")
  23. user2=User()
  24. user2.username="ayue"
  25. user2.email="999@qq.com"
  26. with app.app_context():
  27. db.create_all()
  28. @app.route('/')
  29. def hello_world():
  30. return 'Hello World!'
  31. if __name__ == '__main__':
  32. app.run()

效果如图

增加数据

  1. from flask import Flask
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. class User(db.Model):
  13. __tablename__ = 'user'
  14. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  15. # varchar(50),唯一约束,不能为空
  16. username = db.Column(db.String(50), nullable=False)
  17. email = db.Column(db.String(120), unique=True, nullable=False)
  18. @app.route('/uesr/add')
  19. def add_user():
  20. user1 = User(username="coleak", email="666@qq.com")
  21. user2 = User()
  22. user2.username = "ayue"
  23. user2.email = "999@qq.com"
  24. db.session.add(user1)
  25. db.session.add(user2)
  26. db.session.commit()
  27. return "用户创建成功"
  28. @app.route('/')
  29. def hello_world():
  30. return 'Hello World!'
  31. if __name__ == '__main__':
  32. app.run()

效果如图 

查找数据

  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # 继承了db.Model
  13. class User(db.Model):
  14. __tablename__ = 'user'
  15. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  16. # varchar(50),唯一约束,不能为空
  17. username = db.Column(db.String(50), nullable=False)
  18. email = db.Column(db.String(120), unique=True, nullable=False)
  19. @app.route('/uesr/add')
  20. def add_user():
  21. user1 = User(username="coleak", email="66666@qq.com")
  22. user2 = User()
  23. user2.username = "ayue"
  24. user2.email = "99999@qq.com"
  25. db.session.add(user1)
  26. db.session.add(user2)
  27. db.session.commit()
  28. return "用户创建成功"
  29. @app.route("/user/query")
  30. # get查找和filter_by查找
  31. def query_user():
  32. # id=request.args.get('id')
  33. # user=User.query.get(id)
  34. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  35. username = request.args.get('username')
  36. users=User.query.filter_by(username=username)
  37. for user in users:
  38. print(f"{user.id}:{user.username}-{user.email}")
  39. return "数据查找成功!"
  40. @app.route('/')
  41. def hello_world():
  42. return 'Hello World!'
  43. if __name__ == '__main__':
  44. app.run()
http://127.0.0.1:5000/user/query?username=coleak

修改数据

  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # 继承了db.Model
  13. class User(db.Model):
  14. __tablename__ = 'user'
  15. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  16. # varchar(50),唯一约束,不能为空
  17. username = db.Column(db.String(50), nullable=False)
  18. email = db.Column(db.String(120), unique=True, nullable=False)
  19. # 新增数据
  20. @app.route('/uesr/add')
  21. def add_user():
  22. user1 = User(username="coleak", email="66666@qq.com")
  23. user2 = User()
  24. user2.username = "ayue"
  25. user2.email = "99999@qq.com"
  26. db.session.add(user1)
  27. db.session.add(user2)
  28. db.session.commit()
  29. return "用户创建成功"
  30. # 查询数据
  31. @app.route("/user/query")
  32. # get查找和filter_by查找
  33. def query_user():
  34. # id=request.args.get('id')
  35. # user=User.query.get(id)
  36. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  37. username = request.args.get('username')
  38. users=User.query.filter_by(username=username)
  39. for user in users:
  40. print(f"{user.id}:{user.username}-{user.email}")
  41. return "数据查找成功!"
  42. # 修改数据
  43. @app.route("/user/update")
  44. def update_user():
  45. username = request.args.get('username')
  46. newemail=request.args.get('newemail')
  47. user = User.query.filter_by(username=username).first()
  48. user.email=newemail
  49. db.session.commit()
  50. return "update--successful"
  51. # 根路由
  52. @app.route('/')
  53. def hello_world():
  54. return 'Hello World!'
  55. if __name__ == '__main__':
  56. app.run()
http://127.0.0.1:5000/user/update?username=coleak&newemail=666888@qq.com

删除数据

  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # 继承了db.Model
  13. class User(db.Model):
  14. __tablename__ = 'user'
  15. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  16. # varchar(50),唯一约束,不能为空
  17. username = db.Column(db.String(50), nullable=False)
  18. email = db.Column(db.String(120), unique=True, nullable=False)
  19. # 新增数据
  20. @app.route('/uesr/add')
  21. def add_user():
  22. user1 = User(username="coleak", email="66666@qq.com")
  23. user2 = User()
  24. user2.username = "ayue"
  25. user2.email = "99999@qq.com"
  26. db.session.add(user1)
  27. db.session.add(user2)
  28. db.session.commit()
  29. return "用户创建成功"
  30. # 查询数据
  31. @app.route("/user/query")
  32. # get查找和filter_by查找
  33. def query_user():
  34. # id=request.args.get('id')
  35. # user=User.query.get(id)
  36. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  37. username = request.args.get('username')
  38. users=User.query.filter_by(username=username)
  39. for user in users:
  40. print(f"{user.id}:{user.username}-{user.email}")
  41. return "数据查找成功!"
  42. # 修改数据
  43. @app.route("/user/update")
  44. def update_user():
  45. username = request.args.get('username')
  46. newemail=request.args.get('newemail')
  47. user = User.query.filter_by(username=username).first()
  48. user.email=newemail
  49. db.session.commit()
  50. return "update--successful"
  51. # 删除数据
  52. @app.route("/user/delete")
  53. def delete_user():
  54. username = request.args.get('username')
  55. user = User.query.filter_by(username=username).first()
  56. db.session.delete(user)
  57. db.session.commit()
  58. return "delete--successful"
  59. # 根路由
  60. @app.route('/')
  61. def hello_world():
  62. return 'Hello World!'
  63. if __name__ == '__main__':
  64. app.run()
http://127.0.0.1:5000/user/delete?username=coleak

此时第一个coleak被删除了

外键与表的关系

  • 创造外键
  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # 继承了db.Model
  13. class User(db.Model):
  14. __tablename__ = 'user'
  15. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  16. # varchar(50),唯一约束,不能为空
  17. username = db.Column(db.String(50), nullable=False)
  18. email = db.Column(db.String(120), unique=True, nullable=False)
  19. class article(db.Model):
  20. __tablename__="article"
  21. arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
  22. artitle=db.Column(db.String(100), nullable=False)
  23. arcontent=db.Column(db.Text,nullable=False)
  24. author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
  25. # 自动给User添加一个属性articles来获取文章列表
  26. author=db.relationship("User",backref="articles")
  27. with app.app_context():
  28. db.create_all()
  29. # 创建user和article
  30. # coleak=User(username="coleak",email="678@163.com")
  31. # article=article()
  32. # article.title="flask_learn"
  33. # article.content="flask_content"
  34. # article.id=1
  35. # article.author_id=coleak.id
  36. # 新增数据
  37. @app.route('/uesr/add')
  38. def add_user():
  39. user1 = User(username="coleak", email="66666@qq.com")
  40. user2 = User()
  41. user2.username = "ayue"
  42. user2.email = "99999@qq.com"
  43. db.session.add(user1)
  44. db.session.add(user2)
  45. db.session.commit()
  46. return "用户创建成功"
  47. # 查询数据
  48. @app.route("/user/query")
  49. # get查找和filter_by查找
  50. def query_user():
  51. # id=request.args.get('id')
  52. # user=User.query.get(id)
  53. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  54. username = request.args.get('username')
  55. users=User.query.filter_by(username=username)
  56. for user in users:
  57. print(f"{user.id}:{user.username}-{user.email}")
  58. return "数据查找成功!"
  59. # 修改数据
  60. @app.route("/user/update")
  61. def update_user():
  62. username = request.args.get('username')
  63. newemail=request.args.get('newemail')
  64. user = User.query.filter_by(username=username).first()
  65. user.email=newemail
  66. db.session.commit()
  67. return "update--successful"
  68. # 删除数据
  69. @app.route("/user/delete")
  70. def delete_user():
  71. username = request.args.get('username')
  72. user = User.query.filter_by(username=username).first()
  73. db.session.delete(user)
  74. db.session.commit()
  75. return "delete--successful"
  76. # 根路由
  77. @app.route('/')
  78. def hello_world():
  79. return 'Hello World!'
  80. if __name__ == '__main__':
  81. app.run()

  • backref关联查找
  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. app = Flask(__name__)
  4. # 数据库配置
  5. HOSTNAME = '127.0.0.1'
  6. PORT = 3306
  7. DATABASE = 'flask'
  8. USERNAME = 'root'
  9. PASSWORD = '123456'
  10. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  11. db=SQLAlchemy(app)
  12. # 继承了db.Model
  13. class User(db.Model):
  14. __tablename__ = 'user'
  15. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  16. # varchar(50),唯一约束,不能为空
  17. username = db.Column(db.String(50), nullable=False)
  18. email = db.Column(db.String(120), unique=True, nullable=False)
  19. class article(db.Model):
  20. __tablename__="article"
  21. arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
  22. artitle=db.Column(db.String(100), nullable=False)
  23. arcontent=db.Column(db.Text,nullable=False)
  24. author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
  25. # 自动给User添加一个属性articles来获取文章列表
  26. author=db.relationship("User",backref="articles")
  27. @app.route('/article/add')
  28. def add_article():
  29. Article1=article(artitle="flask_learn",arcontent="flask_content")
  30. Article1.author=User.query.get(2)
  31. Article2 = article(artitle="flask_learn2", arcontent="flask_content2")
  32. Article2.author = User.query.get(2)
  33. db.session.add_all([Article1,Article2])
  34. db.session.commit()
  35. return "文章1,2添加成功"
  36. @app.route("/article/query")
  37. def query():
  38. user=User.query.get(2)
  39. for article in user.articles:
  40. print(article.artitle)
  41. return "文章查找成功"
  42. # 新增数据
  43. @app.route('/uesr/add')
  44. def add_user():
  45. user1 = User(username="coleak", email="66666@qq.com")
  46. user2 = User()
  47. user2.username = "ayue"
  48. user2.email = "99999@qq.com"
  49. db.session.add(user1)
  50. db.session.add(user2)
  51. db.session.commit()
  52. return "用户创建成功"
  53. # 查询数据
  54. @app.route("/user/query")
  55. # get查找和filter_by查找
  56. def query_user():
  57. # id=request.args.get('id')
  58. # user=User.query.get(id)
  59. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  60. username = request.args.get('username')
  61. users=User.query.filter_by(username=username)
  62. for user in users:
  63. print(f"{user.id}:{user.username}-{user.email}")
  64. return "数据查找成功!"
  65. # 修改数据
  66. @app.route("/user/update")
  67. def update_user():
  68. username = request.args.get('username')
  69. newemail=request.args.get('newemail')
  70. user = User.query.filter_by(username=username).first()
  71. user.email=newemail
  72. db.session.commit()
  73. return "update--successful"
  74. # 删除数据
  75. @app.route("/user/delete")
  76. def delete_user():
  77. username = request.args.get('username')
  78. user = User.query.filter_by(username=username).first()
  79. db.session.delete(user)
  80. db.session.commit()
  81. return "delete--successful"
  82. # 根路由
  83. @app.route('/')
  84. def hello_world():
  85. return 'Hello World!'
  86. if __name__ == '__main__':
  87. app.run()
 

migrate迁移

  1. from flask import Flask,request
  2. from flask_sqlalchemy import SQLAlchemy
  3. from flask_migrate import Migrate
  4. app = Flask(__name__)
  5. # 数据库配置
  6. HOSTNAME = '127.0.0.1'
  7. PORT = 3306
  8. DATABASE = 'flask'
  9. USERNAME = 'root'
  10. PASSWORD = '123456'
  11. app.config['SQLALCHEMY_DATABASE_URI']=f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset=utf8mb4"
  12. db=SQLAlchemy(app)
  13. migrate=Migrate(app,db)
  14. # flask db init
  15. # flask db migrate
  16. # flask db upgrade
  17. # 继承了db.Model
  18. class User(db.Model):
  19. __tablename__ = 'user'
  20. id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  21. # varchar(50),唯一约束,不能为空
  22. username = db.Column(db.String(50), nullable=False)
  23. passw = db.Column(db.String(50), nullable=False)
  24. email = db.Column(db.String(120), unique=True, nullable=False)
  25. class article(db.Model):
  26. __tablename__="article"
  27. arid=db.Column(db.Integer, primary_key=True, autoincrement=True)
  28. artitle=db.Column(db.String(100), nullable=False)
  29. arcontent=db.Column(db.Text,nullable=False)
  30. author_id=db.Column(db.Integer, db.ForeignKey('user.id'))
  31. # 自动给User添加一个属性articles来获取文章列表
  32. author=db.relationship("User",backref="articles")
  33. @app.route('/article/add')
  34. def add_article():
  35. Article1=article(artitle="flask_learn",arcontent="flask_content")
  36. Article1.author=User.query.get(2)
  37. Article2 = article(artitle="flask_learn2", arcontent="flask_content2")
  38. Article2.author = User.query.get(2)
  39. db.session.add_all([Article1,Article2])
  40. db.session.commit()
  41. return "文章1,2添加成功"
  42. @app.route("/article/query")
  43. def query():
  44. user=User.query.get(2)
  45. for article in user.articles:
  46. print(article.artitle)
  47. return "文章查找成功"
  48. # 新增数据
  49. @app.route('/uesr/add')
  50. def add_user():
  51. user1 = User(username="coleak", email="66666@qq.com")
  52. user2 = User()
  53. user2.username = "ayue"
  54. user2.email = "99999@qq.com"
  55. db.session.add(user1)
  56. db.session.add(user2)
  57. db.session.commit()
  58. return "用户创建成功"
  59. # 查询数据
  60. @app.route("/user/query")
  61. # get查找和filter_by查找
  62. def query_user():
  63. # id=request.args.get('id')
  64. # user=User.query.get(id)
  65. # # http: // 127.0.0.1: 5000 / user / query?id = 1
  66. username = request.args.get('username')
  67. users=User.query.filter_by(username=username)
  68. for user in users:
  69. print(f"{user.id}:{user.username}-{user.email}")
  70. return "数据查找成功!"
  71. # 修改数据
  72. @app.route("/user/update")
  73. def update_user():
  74. username = request.args.get('username')
  75. newemail=request.args.get('newemail')
  76. user = User.query.filter_by(username=username).first()
  77. user.email=newemail
  78. db.session.commit()
  79. return "update--successful"
  80. # 删除数据
  81. @app.route("/user/delete")
  82. def delete_user():
  83. username = request.args.get('username')
  84. user = User.query.filter_by(username=username).first()
  85. db.session.delete(user)
  86. db.session.commit()
  87. return "delete--successful"
  88. # 根路由
  89. @app.route('/')
  90. def hello_world():
  91. return 'Hello World!'
  92. if __name__ == '__main__':
  93. app.run()
  1. migrate=Migrate(app,db)
  2. # flask db init 只执行一次生成文件目录
  3. # flask db migrate
  4. # flask db upgrade

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/180983
推荐阅读
相关标签
  

闽ICP备14008679号