当前位置:   article > 正文

Flask+mysql 实现增删改查接口开发+测试(图文教程附源码)_flask连接mysql实现增删改查

flask连接mysql实现增删改查

目录

一、前言

二、环境准备

2.1 安装python

2.2 安装mysql 

三、工具准备

3.1 安装pycharm

3.2 安装Navicat

3.3 安装postman

四、mysql数据库准备

4.1 Navicat连接

4.2 数据库准备

五、增删改查接口(服务)开发

5.1 表的增删改查操作 

5.2 增加接口服务

5.3 删除接口服务

5.4 修改接口服务

5.5 查询接口服务

六、接口测试

 6.1 测试增加接口服务

 6.2 测试删除接口服务

 6.3 测试修改接口服务

 6.4 测试查询接口服务

七、总结

八、展望

九、附录(源代码)


一、前言

       1.1 之前在练习了Django的博客项目,前后端不分离的,现在就想着把Flask框架也再熟悉一下,后续把接口返回的数据用vue显示。python 比较流行的框架是Django 和Flask,重要性不言而喻

       1.2 刚好是五一,发文符合劳动节精神,深圳又下雨,在家搬砖

二、环境准备

2.1 安装python

       确保你的电脑安装了python,并正确配置好环境变量。查看版本命令:

python -V

本文内容运行在python 3.6.1

2.2 安装mysql 

       确保你的电脑安装了mysql,并正确设置账号和密码,本机账号和密码为 root/root

mysql -V

本文内容mysql 5.7中运行通过

三、工具准备

3.1 安装pycharm

       确保你的电脑安装了pycharm,我这里用的是pycharm professional 2017.3(已pojie)

3.2 安装Navicat

       可视化数据库查询工具准备,工具很多,本文用的是拿手的Navicat 

3.3 安装postman

        postman是一款必会的工具,开发和测试都在用

四、mysql数据库准备

4.1 Navicat连接

       连接名起一个test,主机输入localhost或者127.0.0.1,默认端口3306,输入用户名和密码root/root,点击链接测试,链接成功

4.2 数据库准备

       在链接上test数据源后,在test处右键,新建数据库,数据库名books,字符集utf-8,排序规则选择utf8_general_ci,点击确认,数据新建完成

 

注意:此处不新建表,从代码里面通过模型类建表 

五、增删改查接口(服务)开发

       开发前先做一些准备工作,新建flask项目,file ->new project

 创建完成。我这里book是根目录,主文件我改名字app.py,不该也可以此时就可以右键启动了

控制台会打印 访问地址,点击访问 浏览器页面显示 Hello World!,我这里代码删了不贴图了。

 安装一些用的库,点击file->settings,flask、PyMySQL、SQLAlchemy、flask-sqlalchemy等(注意有些库使用的时候的名字和导入的时候的名字不一样,比如导入flask-sqlalchemy,代码里面却是flask_sqlalchemy)

接着,逐步把以下代码添加到app.py文件中。先导入一些要用到的库

  1. import pymysql
  2. from flask import Flask
  3. from flask_sqlalchemy import SQLAlchemy
  4. from flask import make_response,request
  5. from flask_cors import CORS
  6. pymysql.install_as_MySQLdb()

数据库设计,使用flask框架模型类添加,代码如下

先连接数据库,修改账号、密码、主机名、端口号、数据库名为你们使用的即可

  1. app = Flask(__name__)
  2. # ------------------database----------------------------
  3. app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost:3306/books'
  4. # 指定数据库文件
  5. app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
  6. # 允许修改跟踪数据库
  7. db = SQLAlchemy(app)

模型类代码:下面建立一个books表,字段有id、title、author、read_status,注意字段大小和类型,这些都是事先要把需求确定好,不然测试肯定有问题。比如:有需求说作者不能超过20个字,下面这种设计120大小就不合适。这就是实际开发和学习的区别

  1. class Books(db.Model):
  2. id = db.Column(db.Integer, primary_key=True, comment='自动递增id,唯一键')
  3. title = db.Column(db.String(80), nullable=False, comment='书名')
  4. author = db.Column(db.String(120), nullable=False, comment='作者')
  5. read_status = db.Column(db.Boolean, comment='阅读状态,0未读,1已读')

通过下面这行代码把表创建好

db.create_all()  # 创建表(表创建好后可注释掉)

查看数据库表DDL

 

5.1 表的增删改查操作 

       以下是增删改查方法代码:

  1. # 增加数据
  2. def insert_data(title, author, read_status):
  3. book = Books(title=title, author=author, read_status=read_status)
  4. db.session.add_all([book])
  5. db.session.commit()
  6. # 查询所有
  7. def select_data_all():
  8. book_list = []
  9. books = Books.query.all()
  10. # 类似于 select * from Books
  11. for s in books:
  12. dic = {}
  13. dic['id'] = s.id
  14. dic['title'] = s.title
  15. dic['author'] = s.author
  16. dic['read_status'] = s.read_status
  17. book_list.append(dic)
  18. return book_list
  19. # 通过id查询
  20. def select_data_by_id(id):
  21. book = Books.query.get(id)
  22. if not book:
  23. return False
  24. dic = {}
  25. dic['id'] = book.id
  26. dic['title'] = book.title
  27. dic['author'] = book.author
  28. dic['read_status'] = book.read_status
  29. return dic
  30. # 通过id删除数据
  31. def delete_data(id):
  32. # 类似于 select * from Books where id = id
  33. delete_id = Books.query.get(id)
  34. if not delete_id:
  35. return False
  36. db.session.delete(delete_id)
  37. db.session.commit()
  38. # 提交操作到数据库
  39. # 修改数据
  40. def update_data(id, title='', author='', read_status='', new_id=''):
  41. book = Books.query.get(id)
  42. if not title == '':
  43. book.title = title
  44. if not author == '':
  45. book.author = author
  46. if not read_status == '':
  47. book.read_status = read_status
  48. if not new_id == '':
  49. book.id = new_id
  50. db.session.commit()

插播,以下代码解决跨域问题(这是我为后面准备开发页面用的)

  1. # 解决浏览器浏览器访问输出乱码问题
  2. app.config['JSON_AS_ASCII'] = False
  3. @app.after_request
  4. def after(resp):
  5. resp = make_response(resp)
  6. resp.headers['Access-Control-Allow-Origin'] = '*' # 允许跨域地址
  7. resp.headers['Access-Control-Allow-Methods'] = '*' # 请求 ‘*’ 就是全部
  8. resp.headers['Access-Control-Allow-Headers'] = 'x-requested-with,content-type' # 头部
  9. resp.headers['Access-Control-Allow-Credentials'] = 'True'
  10. return resp
  11. CORS(app, resources=r'/*', supports_credentials=True)

5.2 增加接口服务

       讲增加接口服务前,定义以下需要传的参数title、author、read_status,title不能重复,

title、author、read_status是必传参数,title、author不能为空,read_status阅读状态只能为0和1

后续测试场景就从这里来,后面的同理

  1. # 前端通过传参title、author、read_status增加书籍
  2. @app.route('/add', methods=['POST'])
  3. def add():
  4. response_object = {'status': 'success'}
  5. if request.method == 'POST':
  6. post_data = request.get_json()
  7. print('调用add方传过来的参数是', post_data)
  8. book_list = select_data_all()
  9. for i in range(len(book_list)):
  10. title_list = book_list[i]['title']
  11. if post_data.get('title') in title_list:
  12. response_object['message'] = '书名(title)重复!'
  13. response_object["status"]= 'fail'
  14. return response_object
  15. if post_data.get('title') is None:
  16. response_object['message'] = 'title是必传参数!'
  17. response_object["status"]= 'fail'
  18. return response_object
  19. if post_data.get('author') is None:
  20. response_object['message'] = 'author是必传参数!'
  21. response_object["status"]= 'fail'
  22. return response_object
  23. if post_data.get('read_status') is None:
  24. response_object['message'] = 'read_status是必传参数!'
  25. response_object["status"]= 'fail'
  26. return response_object
  27. title = str(post_data.get('title')).strip(),
  28. author = str(post_data.get('author')).strip(),
  29. read_status = post_data.get('read_status')
  30. if title[0] is None or title[0] is '':
  31. response_object['message'] = 'title不能为空!'
  32. response_object["status"] = 'fail'
  33. return response_object
  34. if author[0] is None or author[0] is '':
  35. response_object['message'] = '作者不能为空!'
  36. response_object["status"] = 'fail'
  37. return response_object
  38. if read_status != 0 and read_status != 1:
  39. response_object['message'] = '阅读状态只能为0和1!'
  40. response_object["status"] = 'fail'
  41. return response_object
  42. insert_data(title=title[0], author=author[0], read_status=read_status)
  43. response_object['message'] = '图书添加成功!'
  44. return response_object

代码点评:

  • 使用not in检查必需的参数是否存在,如果不存在则返回HTTP状态码400(Bad Request)。
  • 对于空字符串的判断,使用not titlenot author,而不是与None进行比较。
  • 使用int(post_data['read_status'])read_status转换为整数类型。
  • 为了避免重复的书名,提取已存在的书名列表,并使用列表推导式构建title_list
  • 在返回错误响应时,包含了HTTP状态码400。
  1. @app.route('/add', methods=['POST'])
  2. def add():
  3. response_object = {'status': 'success'}
  4. if request.method == 'POST':
  5. post_data = request.get_json()
  6. print('调用add方传过来的参数是', post_data)
  7. if 'title' not in post_data:
  8. response_object['message'] = 'title是必传参数!'
  9. response_object['status'] = 'fail'
  10. return response_object, 400
  11. if 'author' not in post_data:
  12. response_object['message'] = 'author是必传参数!'
  13. response_object['status'] = 'fail'
  14. return response_object, 400
  15. if 'read_status' not in post_data:
  16. response_object['message'] = 'read_status是必传参数!'
  17. response_object['status'] = 'fail'
  18. return response_object, 400
  19. title = post_data['title'].strip()
  20. author = post_data['author'].strip()
  21. read_status = int(post_data['read_status'])
  22. if not title:
  23. response_object['message'] = 'title不能为空!'
  24. response_object['status'] = 'fail'
  25. return response_object, 400
  26. if not author:
  27. response_object['message'] = '作者不能为空!'
  28. response_object['status'] = 'fail'
  29. return response_object, 400
  30. if read_status not in (0, 1):
  31. response_object['message'] = '阅读状态只能为0和1!'
  32. response_object['status'] = 'fail'
  33. return response_object, 400
  34. book_list = select_data_all()
  35. title_list = [book['title'] for book in book_list]
  36. if title in title_list:
  37. response_object['message'] = '书名(title)重复!'
  38. response_object['status'] = 'fail'
  39. return response_object, 400
  40. insert_data(title=title, author=author, read_status=read_status)
  41. response_object['message'] = '图书添加成功!'
  42. return response_object

5.3 删除接口服务

       讲删除接口服务前,定义以下需要传的参数,id是必传参数,输入不存在的id删除报错图书不存在

  1. # 前端通过传id删除书籍
  2. @app.route('/delete', methods=['DELETE'])
  3. def delete():
  4. response_object = {'status': 'success'}
  5. if request.method == 'DELETE':
  6. post_data = request.get_json()
  7. print('调用delete方传过来的参数是:', post_data)
  8. if post_data.get('id') is None:
  9. response_object['message'] = 'id是必传参数!'
  10. response_object["status"]= 'fail'
  11. return response_object
  12. id = post_data.get('id')
  13. result = delete_data(id) # 删除方法调用
  14. if result is False:
  15. response_object['message'] = '需要删除的图书不存在!'
  16. response_object["status"] = 'fail'
  17. return response_object
  18. else:
  19. response_object['message'] = '图书被删除!'
  20. return response_object

代码点评:

  • 不再使用== None进行空值判断,而是使用not in检查键是否存在于字典中。
  • 对于缺少必需的id参数,返回了HTTP状态码400(Bad Request)。
  • 对于找不到需要删除的图书,返回了HTTP状态码404(Not Found)。
  • 使用not result判断删除方法的结果,而不是使用== False
  • 省略了不必要的response_object["status"] = 'success',因为默认状态就是成功。
  1. @app.route('/delete', methods=['POST'])
  2. def delete():
  3. response_object = {'status': 'success'}
  4. if request.method == 'POST':
  5. post_data = request.get_json()
  6. print('调用delete方传过来的参数是:', post_data)
  7. if 'id' not in post_data:
  8. response_object['message'] = 'id是必传参数!'
  9. response_object['status'] = 'fail'
  10. return response_object, 400
  11. id = post_data['id']
  12. result = delete_data(id) # 删除方法调用
  13. if not result:# 这行代码有bug
  14. response_object['message'] = '需要删除的图书不存在!'
  15. response_object['status'] = 'fail'
  16. return response_object, 404
  17. response_object['message'] = '图书被删除!'
  18. return response_object

上面的代码有bug,如果改为

if not result:

会导致的效果是调用删除接口,但是实际上返回   需要删除的图书不存在!

 改为:原来的:

if result is False:

就OK了 

但是我查chatgpt,又说是一样的。不是啥情况

 

5.4 修改接口服务

       在讲修改接口服务前,定义以下需要传的参数id、title、author、read_status是必传参数,title、author不能为空,read_status阅读状态只能为0和1,输入不存在的id报错需要修改的书籍id不存在

  1. # 前端通过传参title、author、read_status修改书籍
  2. @app.route('/update', methods=['POST'])
  3. def update():
  4. response_object = {'status': 'success'}
  5. if request.method == 'POST':
  6. post_data = request.get_json()
  7. print('调用update方传过来的参数是', post_data)
  8. if post_data.get('id') is None:
  9. response_object['message'] = 'id是必传参数!'
  10. response_object["status"]= 'fail'
  11. return response_object
  12. if post_data.get('title') is None:
  13. response_object['message'] = 'title是必传参数!'
  14. response_object["status"]= 'fail'
  15. return response_object
  16. if post_data.get('author') is None:
  17. response_object['message'] = 'author是必传参数!'
  18. response_object["status"]= 'fail'
  19. return response_object
  20. if post_data.get('read_status') is None:
  21. response_object['message'] = 'read_status是必传参数!'
  22. response_object["status"]= 'fail'
  23. return response_object
  24. # 查询所有数据
  25. book_list = select_data_all()
  26. # 拼接所有的id到列表
  27. for i in range(len(book_list)):
  28. id_list = book_list[i]['id']
  29. # 判断书籍id在不在列表内
  30. if post_data.get('id') is not id_list and int(post_data.get('id')) is not id_list:
  31. response_object['message'] = '需要修改的书籍id不存在!'
  32. response_object["status"]= 'fail'
  33. return response_object
  34. title = str(post_data.get('title')).strip(),
  35. author = str(post_data.get('author')).strip(),
  36. read_status = post_data.get('read_status')
  37. if title[0] is None or title[0] is '':
  38. response_object['message'] = 'title不能为空!'
  39. response_object["status"] = 'fail'
  40. return response_object
  41. if author[0] is None or author[0] is '':
  42. response_object['message'] = '作者不能为空!'
  43. response_object["status"] = 'fail'
  44. return response_object
  45. if read_status != 0 and read_status != 1:
  46. response_object['message'] = '阅读状态只能为0和1!'
  47. response_object["status"] = 'fail'
  48. return response_object
  49. books_id = post_data.get('id')
  50. title = post_data.get('title')
  51. author = post_data.get('author')
  52. read_status = post_data.get('read_status')
  53. update_data(id=books_id, title=title, author=author, read_status=read_status)
  54. response_object['message'] = '图书已更新!'
  55. return response_object

代码点评:

  • 使用 required_fields 列表来存储必传参数,遍历检查是否存在。
  • 使用列表推导式生成 id_list,避免在循环中重复拼接。
  • 使用 not titlenot author 的形式来检查是否为空,更简洁。
  • 使用列表形式 [0, 1] 来检查阅读状态,避免使用多个不等式判断。
  • 删除不必要的 response_object 变量。
  • 不需要显式地指定 HTTP 响应状态码,Flask 会根据返回的 JSON 对象自动设置状态码
  1. from flask import json
  2. @app.route('/update', methods=['POST'])
  3. def update():
  4. if request.method == 'POST':
  5. post_data = request.get_json()
  6. print('调用update方传过来的参数是', post_data)
  7. required_fields = ['id', 'title', 'author', 'read_status']
  8. for field in required_fields:
  9. if field not in post_data:
  10. return json.dumps({
  11. 'status': 'fail',
  12. 'message': f'{field}是必传参数!'
  13. }), 400
  14. book_list = select_data_all()
  15. id_list = [book['id'] for book in book_list]
  16. if post_data['id'] not in id_list:
  17. return json.dumps({
  18. 'status': 'fail',
  19. 'message': '需要修改的书籍id不存在!'
  20. }), 404
  21. title = str(post_data['title']).strip()
  22. author = str(post_data['author']).strip()
  23. #read_status = post_data['read_status']#这里需要强制转换不然前端代码报错
  24. read_status = int(post_data['read_status'])
  25. if not title:
  26. return json.dumps({
  27. 'status': 'fail',
  28. 'message': 'title不能为空!'
  29. }), 400
  30. if not author:
  31. return json.dumps({
  32. 'status': 'fail',
  33. 'message': '作者不能为空!'
  34. }), 400
  35. if read_status not in [0, 1]:
  36. return json.dumps({
  37. 'status': 'fail',
  38. 'message': '阅读状态只能为0和1!'
  39. }), 400
  40. books_id = post_data['id']
  41. update_data(id=books_id, title=title, author=author, read_status=read_status)
  42. return json.dumps({
  43. 'status': 'success',
  44. 'message': '图书已更新!'
  45. })

5.5 查询接口服务

       在讲查询接口服务前,定义以下需要传的参数

不传id默认查询所有,传了id查询数据库为id的记录,传了id,id就不能为空

  1. # 前端通过不传参默认查询所有书籍,传id查询对应书籍
  2. @app.route('/query', methods=['POST'])
  3. def query():
  4. response_object = {'status': 'success'}
  5. if request.method == 'POST':
  6. post_data = request.get_json()
  7. print('调用query方传过来的参数是', post_data)
  8. if post_data.get('id') is None:
  9. books = select_data_all()
  10. response_object['message'] = '查询所有书籍成功!'
  11. response_object['books'] = books
  12. return response_object
  13. id = str(post_data.get('id')).strip()
  14. if id is None or id is '':
  15. response_object['message'] = 'id不能为空!'
  16. response_object["status"] = 'fail'
  17. return response_object
  18. book = select_data_by_id(id)
  19. if book is False:
  20. response_object['message'] = '需要查询的图书不存在!'
  21. response_object["status"] = 'fail'
  22. return response_object
  23. else:
  24. response_object['message'] = '查询书籍成功!'
  25. response_object['books'] = book
  26. return response_object

代码点评:

  • 移除了冗余的条件判断和注释。
  • 使用is None进行空值检查,而不是与None进行比较。
  • 将空字符串的判断条件改为id == '' or id is None,使代码更加简洁。
  • 在最后直接返回response_object,避免重复的return语句。

  1. @app.route('/query', methods=['POST'])
  2. def query():
  3. response_object = {'status': 'success'}
  4. if request.method == 'POST':
  5. post_data = request.get_json()
  6. print('调用query方传过来的参数是', post_data)
  7. id = str(post_data.get('id')).strip()
  8. if id == '' or id is None:
  9. books = select_data_all()
  10. response_object['message'] = '查询所有图书成功!'
  11. response_object['data'] = books
  12. return response_object
  13. book = select_data_by_id(id)
  14. if book is False:
  15. response_object['message'] = '需要查询的图书不存在!'
  16. response_object["status"] = 'fail'
  17. else:
  18. response_object['message'] = '图书查询成功!'
  19. response_object['data'] = book
  20. return response_object

所有服务搞完了,启动项目

  

六、接口测试

       接口写完,准备测试(实际开发中肯定是边开发边测试的,大家注意)

用以下代码:

insert_data("《水浒传》", "吴承恩", 1)

或者手工往数据库先加一些数据用来测试,现在往里面添加

 6.1 测试增加接口服务

       测试如下

请求参数,json格式的

  1. {
  2. "author": "保尔柯察金",
  3. "read_status":0,
  4. "title": "《钢铁是怎么炼成的》"
  5. }

 点击发送,添加成功。

检查数据库,新增了记录

在这个基础上再次 send

 

书名重复,校验正确。其他的测试场景这里就不列了 

 6.2 测试删除接口服务

       测试如下,我们把刚才新增的书删除掉

请求参数:

  1. {
  2. "id":60
  3. }

 删除成功

在这个基础上再次 send

图书不存在校验成功 

数据库记录已被删除 

 6.3 测试修改接口服务

       测试如下

我们修改第4本书

请求参数:

  1. {
  2. "id":4,
  3. "author": "罗贯中1",
  4. "read_status":0,
  5. "title": "《三国演义1》"
  6. }

 更新成功

 检查数据库

成功,其他的测试场景这里就不列了 

 6.4 测试查询接口服务

       测试如下

请求参数不传默认查询所有

  1. {
  2. }

send 

 返回数据:

  1. {
  2. "books": [
  3. {
  4. "author": "施耐庵",
  5. "id": 1,
  6. "read_status": true,
  7. "title": "《西游记》"
  8. },
  9. {
  10. "author": "曹雪芹",
  11. "id": 2,
  12. "read_status": false,
  13. "title": "《红楼梦》"
  14. },
  15. {
  16. "author": "吴承恩",
  17. "id": 3,
  18. "read_status": true,
  19. "title": "《水浒传》"
  20. },
  21. {
  22. "author": "罗贯中1",
  23. "id": 4,
  24. "read_status": false,
  25. "title": "《三国演义1》"
  26. }
  27. ],
  28. "message": "查询所有书籍成功!",
  29. "status": "success"
  30. }

 与数据库一致

查询所有书籍成功

 通过id查询对应书籍,选择查第4本书,请求参数

  1. {
  2. "id":4
  3. }

结果 ,查询成功

 对照返回信息与数据库信息一致,验证ok

其他的测试场景这里也不列了,大家可以试下 ,比如传不存在的图书id

七、总结

       测试场景真的很多,测试不通过就加校验,测试不通过就加校验,如此反复(当然在这个过程中复习了很多基础知识),所以实际开发肯定是按照需求来做校验,不然时间肯定是不够的。因为现在的迭代很快,作为测试考虑开发,确实是想怎么简单怎么来,站在测试的角度,又会觉得,这个开发咋这又没做校验啊,那也没做校验啊。。。。。。做好本职工作,要做好角色转换,轻松拿捏职场

八、展望

       后续根据技能学习目标和职业生涯发展规则会继续学习,flask是测试开发必知必会的内容,希望大家好好掌握,做一个优秀的测试工程师

九、附录(源代码)

  1. # -*- coding: utf-8 -*-
  2. # @Author : Liqiju
  3. # @Time : 2022/5/1 2:45
  4. # @File : app.py
  5. # @Software: PyCharm
  6. import pymysql
  7. from flask import Flask
  8. from flask_sqlalchemy import SQLAlchemy
  9. from flask import make_response,request
  10. from flask_cors import CORS
  11. pymysql.install_as_MySQLdb()
  12. app = Flask(__name__)
  13. # ------------------database----------------------------
  14. app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost:3306/books'
  15. # 指定数据库文件
  16. app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
  17. # 允许修改跟踪数据库
  18. db = SQLAlchemy(app)
  19. class Books(db.Model):
  20. id = db.Column(db.Integer, primary_key=True, comment='自动递增id,唯一键')
  21. title = db.Column(db.String(80), nullable=False, comment='书名')
  22. author = db.Column(db.String(120), nullable=False, comment='作者')
  23. read_status = db.Column(db.Boolean, comment='阅读状态,0未读,1已读')
  24. # 增加数据
  25. def insert_data(title, author, read_status):
  26. book = Books(title=title, author=author, read_status=read_status)
  27. db.session.add_all([book])
  28. db.session.commit()
  29. # 查询所有
  30. def select_data_all():
  31. book_list = []
  32. books = Books.query.all()
  33. # 类似于 select * from Books
  34. for s in books:
  35. dic = {}
  36. dic['id'] = s.id
  37. dic['title'] = s.title
  38. dic['author'] = s.author
  39. dic['read_status'] = s.read_status
  40. book_list.append(dic)
  41. return book_list
  42. # 通过id查询
  43. def select_data_by_id(id):
  44. book = Books.query.get(id)
  45. if not book:
  46. return False
  47. dic = {}
  48. dic['id'] = book.id
  49. dic['title'] = book.title
  50. dic['author'] = book.author
  51. dic['read_status'] = book.read_status
  52. return dic
  53. # 通过id删除数据
  54. def delete_data(id):
  55. # 类似于 select * from Books where id = id
  56. delete_id = Books.query.get(id)
  57. if not delete_id:
  58. return False
  59. db.session.delete(delete_id)
  60. db.session.commit()
  61. # 提交操作到数据库
  62. # 修改数据
  63. def update_data(id, title='', author='', read_status='', new_id=''):
  64. book = Books.query.get(id)
  65. if not title == '':
  66. book.title = title
  67. if not author == '':
  68. book.author = author
  69. if not read_status == '':
  70. book.read_status = read_status
  71. if not new_id == '':
  72. book.id = new_id
  73. db.session.commit()
  74. # 解决浏览器浏览器访问输出乱码问题
  75. app.config['JSON_AS_ASCII'] = False
  76. @app.after_request
  77. def after(resp):
  78. resp = make_response(resp)
  79. resp.headers['Access-Control-Allow-Origin'] = '*' # 允许跨域地址
  80. resp.headers['Access-Control-Allow-Methods'] = '*' # 请求 ‘*’ 就是全部
  81. resp.headers['Access-Control-Allow-Headers'] = 'x-requested-with,content-type' # 头部
  82. resp.headers['Access-Control-Allow-Credentials'] = 'True'
  83. return resp
  84. CORS(app, resources=r'/*', supports_credentials=True)
  85. # 前端通过传参title、author、read_status增加书籍
  86. @app.route('/add', methods=['POST'])
  87. def add():
  88. response_object = {'status': 'success'}
  89. if request.method == 'POST':
  90. post_data = request.get_json()
  91. print('调用add方传过来的参数是', post_data)
  92. book_list = select_data_all()
  93. for i in range(len(book_list)):
  94. title_list = book_list[i]['title']
  95. if post_data.get('title') in title_list:
  96. response_object['message'] = '书名(title)重复!'
  97. response_object["status"]= 'fail'
  98. return response_object
  99. if post_data.get('title') is None:
  100. response_object['message'] = 'title是必传参数!'
  101. response_object["status"]= 'fail'
  102. return response_object
  103. if post_data.get('author') is None:
  104. response_object['message'] = 'author是必传参数!'
  105. response_object["status"]= 'fail'
  106. return response_object
  107. if post_data.get('read_status') is None:
  108. response_object['message'] = 'read_status是必传参数!'
  109. response_object["status"]= 'fail'
  110. return response_object
  111. title = str(post_data.get('title')).strip(),
  112. author = str(post_data.get('author')).strip(),
  113. read_status = post_data.get('read_status')
  114. if title[0] is None or title[0] is '':
  115. response_object['message'] = 'title不能为空!'
  116. response_object["status"] = 'fail'
  117. return response_object
  118. if author[0] is None or author[0] is '':
  119. response_object['message'] = '作者不能为空!'
  120. response_object["status"] = 'fail'
  121. return response_object
  122. if read_status != 0 and read_status != 1:
  123. response_object['message'] = '阅读状态只能为0和1!'
  124. response_object["status"] = 'fail'
  125. return response_object
  126. insert_data(title=title[0], author=author[0], read_status=read_status)
  127. response_object['message'] = '图书添加成功!'
  128. return response_object
  129. # 前端通过传id删除书籍
  130. @app.route('/delete', methods=['DELETE'])
  131. def delete():
  132. response_object = {'status': 'success'}
  133. if request.method == 'DELETE':
  134. post_data = request.get_json()
  135. print('调用delete方传过来的参数是:', post_data)
  136. if post_data.get('id') is None:
  137. response_object['message'] = 'id是必传参数!'
  138. response_object["status"]= 'fail'
  139. return response_object
  140. id = post_data.get('id')
  141. result = delete_data(id) # 删除方法调用
  142. if result is False:
  143. response_object['message'] = '需要删除的图书不存在!'
  144. response_object["status"] = 'fail'
  145. return response_object
  146. else:
  147. response_object['message'] = '图书被删除!'
  148. return response_object
  149. # 前端通过传参title、author、read_status修改书籍
  150. @app.route('/update', methods=['POST'])
  151. def update():
  152. response_object = {'status': 'success'}
  153. if request.method == 'POST':
  154. post_data = request.get_json()
  155. print('调用update方传过来的参数是', post_data)
  156. if post_data.get('id') is None:
  157. response_object['message'] = 'id是必传参数!'
  158. response_object["status"]= 'fail'
  159. return response_object
  160. if post_data.get('title') is None:
  161. response_object['message'] = 'title是必传参数!'
  162. response_object["status"]= 'fail'
  163. return response_object
  164. if post_data.get('author') is None:
  165. response_object['message'] = 'author是必传参数!'
  166. response_object["status"]= 'fail'
  167. return response_object
  168. if post_data.get('read_status') is None:
  169. response_object['message'] = 'read_status是必传参数!'
  170. response_object["status"]= 'fail'
  171. return response_object
  172. # 查询所有数据
  173. book_list = select_data_all()
  174. # 拼接所有的id到列表
  175. for i in range(len(book_list)):
  176. id_list = book_list[i]['id']
  177. # 判断书籍id在不在列表内
  178. if post_data.get('id') is not id_list and int(post_data.get('id')) is not id_list:
  179. response_object['message'] = '需要修改的书籍id不存在!'
  180. response_object["status"]= 'fail'
  181. return response_object
  182. title = str(post_data.get('title')).strip(),
  183. author = str(post_data.get('author')).strip(),
  184. read_status = post_data.get('read_status')
  185. if title[0] is None or title[0] is '':
  186. response_object['message'] = 'title不能为空!'
  187. response_object["status"] = 'fail'
  188. return response_object
  189. if author[0] is None or author[0] is '':
  190. response_object['message'] = '作者不能为空!'
  191. response_object["status"] = 'fail'
  192. return response_object
  193. if read_status != 0 and read_status != 1:
  194. response_object['message'] = '阅读状态只能为0和1!'
  195. response_object["status"] = 'fail'
  196. return response_object
  197. books_id = post_data.get('id')
  198. title = post_data.get('title')
  199. author = post_data.get('author')
  200. read_status = post_data.get('read_status')
  201. update_data(id=books_id, title=title, author=author, read_status=read_status)
  202. response_object['message'] = '图书已更新!'
  203. return response_object
  204. # 前端通过不传参默认查询所有书籍,传id查询对应书籍
  205. @app.route('/query', methods=['POST'])
  206. def query():
  207. response_object = {'status': 'success'}
  208. if request.method == 'POST':
  209. post_data = request.get_json()
  210. print('调用query方传过来的参数是', post_data)
  211. if post_data.get('id') is None:
  212. books = select_data_all()
  213. response_object['message'] = '查询所有书籍成功!'
  214. response_object['books'] = books
  215. return response_object
  216. id = str(post_data.get('id')).strip()
  217. if id is None or id is '':
  218. response_object['message'] = 'id不能为空!'
  219. response_object["status"] = 'fail'
  220. return response_object
  221. book = select_data_by_id(id)
  222. if book is False:
  223. response_object['message'] = '需要查询的图书不存在!'
  224. response_object["status"] = 'fail'
  225. return response_object
  226. else:
  227. response_object['message'] = '查询书籍成功!'
  228. response_object['books'] = book
  229. return response_object
  230. if __name__ == '__main__':
  231. # 默认是5000,这里设置5001避免本地冲突。打开debug方便调试
  232. # db.create_all() # 创建表(表创建好后可注释掉)
  233. # insert_data("《水浒传》", "吴承恩", 1) # 利用这个可以添加数据或者直接数据库手动加入
  234. # 注意这个时候没开启校验title唯一性是因为不是前端过来的请求
  235. app.run(debug=True, port=5001)

刚才试了在python 3.9版本中,“is” 要改为 “== ”号,不然报错 

有疑问评论区学习交流~ 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号