赞
踩
1. Python 3.4 及更高版本的 Python 3 或Python 2.7 ,推荐使用最新版Python3。
2.安装Flask
pip install Flask
3.选择一款api测试工具
postman
Postman API Platformhttps://www.postman.com/
yapi
- from flask import Flask
- # 初始化app
- app = Flask(__name__)
-
- @app.route('/')
- def hello_world():
- return 'Hello, World!'
-
- if __name__ == "__main__":
- # host指能够访问该后端服务的ip,一般填写0.0.0.0,表示任意ip都可访问
- # port指该后端服务部署的端口,不指定时默认为5000
- # debug指是否开启调试模式,当开启调试模式,修改后端文件会自行重启后端服务
- app.run(host="0.0.0.0", port=5600, debug=False)
执行以上代码后在控制台有以下输出:
在浏览器中访问以上两个中的任意地址。
或使用接口工具请求。
- from flask import Flask
- from flask import request
- from flask import jsonify
- import os
- app = Flask(__name__)
-
- # 设置静态数据
- us_data = {
- "piter":{"sex":"man", "year":"24"}
- }
-
- # 一个接口由以下部分组成:路由、请求方法、接口函数。注意接口函数名不可重复。
- # 路由即url除ip和端口部分,使用 route() 装饰器来把函数绑定到 URL。
- # 请求方法即 HTTP 请求方式,这些请求方式是由 HTTP 协议定义的,用于描述客户端(如浏览器)如何与服务器进行交互。
- # Flask 支持所有标准的 HTTP 请求方式,并允许你通过装饰器和视图函数来灵活地处理它们。
-
- # GET请求,请求指定的页面信息,并返回实体主体。这是最常见的请求方式,通常用于获取数据或资源。
- @app.route('/get_data_by_us_name', methods=['GET'])
- def get_data_by_us_name():
- # 在flask中客户端发送的请求参数都可以使用request来获取。
- us_name = request.args.get("name")
- # 获取用户数据
- us_data_get = us_data.get(us_name, {})
- if us_data_get == {}:
- status_code = 400
- result = {
- "msg":"Query failed! Please check the query username.",
- "us_data":us_data_get,
- "code":status_code
- }
- else:
- status_code = 200
- result = {
- "msg":"Query successful.",
- "us_data":us_data_get,
- "code":status_code
- }
- # jsonify 是flask中一个用于生成 JSON 响应的便捷函数。
- return jsonify(result), status_code
-
- # POST请求,向指定资源提交数据进行处理请求,数据被包含在请求体中。
- # 提交json
- @app.route('/add_us_data_by_json', methods=['POST'])
- def add_us_data_by_json():
- global us_data
- add_us_data = request.get_json()
- if add_us_data["name"] in us_data:
- status_code = 400
- result = {
- "msg":"Add failed! Please check the query data.",
- "code":status_code
- }
- else:
- if add_us_data["name"] not in us_data:
- us_data[add_us_data["name"]] = {"sex":add_us_data["sex"], "year":add_us_data["year"]}
- status_code = 200
- result = {
- "msg":"Add successful.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 例如提交表单
- @app.route('/add_us_data_by_form', methods=['POST'])
- def add_us_data_by_form():
- global us_data
- us_name = request.form.get("name",None)
- us_sex = request.form.get("sex",None)
- us_year = request.form.get("year",None)
- if us_name and us_sex and us_year:
- if us_name not in us_data:
- us_data[us_name] = {"sex":us_sex, "year":us_year}
- status_code = 200
- result = {
- "msg":"Add successful.",
- "code":status_code
- }
- else:
- status_code = 400
- result = {
- "msg":"Add failed! Please check the query data.",
- "code":status_code
- }
- # jsonify 是flask中一个用于生成 JSON 响应的便捷函数。
- return jsonify(result), status_code
-
- # 上传文件
- @app.route('/add_us_data_by_file', methods=['POST'])
- def add_us_data_by_file():
- us_pic_list = request.files.getlist("pic_list", None)
- us_pic = request.files.get("us_pic", None)
- pic_save_name_title = request.form.get("pic_save_name_title", None)
- if us_pic_list and us_pic and pic_save_name_title:
- if not os.path.isdir("./static"):
- os.mkdir("./static")
- for get_file in us_pic_list:
- get_file.save("./static/{}_{}".format(pic_save_name_title,get_file.filename))
- us_pic.save("./static/{}_{}".format(pic_save_name_title, us_pic.filename))
- status_code = 200
- result = {
- "msg":"Add successful.",
- "code":status_code
- }
- else:
- status_code = 400
- result = {
- "msg":"Add failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # PUT请求,从客户端向服务器传送的数据取代指定的资源的内容。
- @app.route('/up_us_data/<name>', methods=['PUT'])
- def up_us_data(name):
- global us_data
- if name in us_data:
- us_year = request.form.get("year",None)
- if us_year:
- us_data[name]["year"] = us_year
- status_code = 200
- result = {
- "msg":"Updated successfully.",
- "code":status_code
- }
- else:
- result = {
- "msg":"Updated failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # DELETE请求,请求服务器删除指定的资源或页面。
- @app.route('/delete_us_data/<name>', methods=['DELETE'])
- def delete_us_data(name):
- global us_data
- if name in us_data:
- us_data.pop(name)
- status_code = 200
- result = {
- "msg":"Delete successfully.",
- "code":status_code
- }
- else:
- result = {
- "msg":"Delete failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 此外在 Flask 中,你可以通过检查 request.method 来确定收到的请求类型,并在一个视图函数中处理多种请求方式。
- @app.route('/handle_request', methods=['GET', 'POST'])
- def handle_request():
- if request.method == 'GET':
- # 处理 GET 请求...
- return "Handling GET request."
- elif request.method == 'POST':
- # 处理 POST 请求...
- return "Handling POST request."
-
-
- if __name__ == "__main__":
- app.run(host="0.0.0.0", port=5600, debug=True)
运行代码后即可通过使用接口请求工具来测试接口。
- from flask import Flask
- from flask import request
- from flask import jsonify
- from flask_sqlalchemy import SQLAlchemy
-
- # 数据配置
- mysql_host = "xxx.xxx.xxx.xxx"
- mysql_port = 3360
- mysql_user = "xxx"
- mysql_ps = "xxx"
- mysql_database = "xxx"
-
- app = Flask(__name__)
- app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://{}:{}@{}:{}/{}".format(mysql_user, mysql_ps, mysql_host, mysql_port,mysql_database)
- # 禁用对象修改的跟踪
- app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
- # 建立连接
- db = SQLAlchemy(app)
-
- # 定义用户表
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(80), unique=True, nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- def __repr__(self):
- return f'<User {self.username}>'
-
- # 创建表
- with app.app_context():
- db.create_all()
-
- # 增
- @app.route('/add_us_data_by_json', methods=['POST'])
- def add_us_data_by_json():
- add_us_data = request.get_json()
- try:
- new_user = User(username=add_us_data['username'], email=add_us_data['email'])
- db.session.add(new_user)
- db.session.commit() # 提交事务以保存到数据库
- status_code = 200
- result = {
- "msg":"Add successful.",
- "code":status_code
- }
- except:
- db.session.rollback()
- status_code = 400
- result = {
- "msg":"Add failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 删
- @app.route('/delete_us_data/<id>', methods=['DELETE'])
- def delete_us_data(id):
- user = User.query.get(int(id)) # 通过 id 查询
- if user:
- db.session.delete(user)
- db.session.commit() # 提交事务以删除记录
- status_code = 200
- result = {
- "msg":"Delete successfully.",
- "code":status_code
- }
- else:
- db.session.rollback()
- status_code = 400
- result = {
- "msg":"Delete failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 改
- @app.route('/up_us_data/<id>', methods=['PUT'])
- def up_us_data(id):
- add_us_data = request.get_json()
- user = User.query.get(int(id))
- if user:
- user.email = add_us_data['email']
- user.username = add_us_data['username']
- db.session.commit() # 提交事务以保存更新
- status_code = 200
- result = {
- "msg":"Updated successfully.",
- "code":status_code
- }
- else:
- db.session.rollback()
- result = {
- "msg":"Updated failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 查
- @app.route('/get_data_by_us_name', methods=['GET'])
- def get_data_by_us_name():
- us_id = request.args.get("id",type=int)
- us_data = User.query.get(us_id)
- if not us_data:
- status_code = 400
- result = {
- "msg":"Query failed! Please check the query username.",
- "us_data":{},
- "code":status_code
- }
- else:
- us_data_get = {
- "id":us_data.id,
- "username":us_data.username,
- "email":us_data.email
- }
- status_code = 200
- result = {
- "msg":"Query successful.",
- "us_data":us_data_get,
- "code":status_code
- }
- return jsonify(result), status_code
-
- if __name__ == "__main__":
- app.run(host="0.0.0.0", port=5600, debug=True)
注意代码中数据库配置需要修改为实际的数据库配置,运行代码后即可通过使用接口请求工具来测试接口。
如上图所示,model文件通常用于定义数据表;static为静态文件夹用于存放一些用户上传的文件等;web用于存放蓝图,一般一个功能模块一个蓝图;app文件为程序入口,用于启动整个后端;congfig文件用于存放配置信息,如数据库配置等;requirements为该项目的依赖。
将上个例子拆解为这种结构后,对应代码如下:
app.py
- from flask_cors import CORS
- from web import create_app
-
- app = create_app()
- # 允许跨域请求
- CORS(app, supports_credentials=True)
-
- if __name__ == '__main__':
- app.run(debug=False, host="0.0.0.0", port=5600)
config.py
- mysql_host = "xxx.xxx.xxx.xxx"
- mysql_port = 3360
- mysql_user = "xxx"
- mysql_ps = "xxx"
- mysql_database = "xxx"
-
- class Config():
- """工程配置信息"""
- # 数据库的配置信息
- SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{}:{}@{}:{}/{}".format(mysql_user, mysql_ps, mysql_host, mysql_port,mysql_database)
- SQLALCHEMY_TRACK_MODIFICATIONS = False
'运行
models.py
- from flask_sqlalchemy import SQLAlchemy
-
- db = SQLAlchemy()
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(80), unique=True, nullable=False)
- email = db.Column(db.String(120), unique=True, nullable=False)
-
- def __repr__(self):
- return f'<User {self.username}>'
web----__init__.py
- from flask import Flask
- from model.models import db
- from config import Config
-
- def create_app():
- app = Flask(__name__)
- app.config.from_object(Config)
- app.config['JSON_AS_ASCII'] = False
- # 查询时会显示原始SQL语句
- app.config['SQLALCHEMY_ECHO'] = False
- # 注册蓝图
- from web.user import user
- app.register_blueprint(user)
- db.init_app(app)
- with app.app_context():
- db.create_all()
- return app
web---user---views.py
- from . import user
- from flask import request, jsonify
- from model.models import db, User
-
- # 增
- @user.route('/add_us_data_by_json', methods=['POST'])
- def add_us_data_by_json():
- add_us_data = request.get_json()
- try:
- new_user = User(username=add_us_data['username'], email=add_us_data['email'])
- db.session.add(new_user)
- db.session.commit() # 提交事务以保存到数据库
- status_code = 200
- result = {
- "msg":"Add successful.",
- "code":status_code
- }
- except:
- db.session.rollback()
- status_code = 400
- result = {
- "msg":"Add failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 删
- @user.route('/delete_us_data/<id>', methods=['DELETE'])
- def delete_us_data(id):
- user = User.query.get(int(id)) # 通过 id 查询
- if user:
- db.session.delete(user)
- db.session.commit() # 提交事务以删除记录
- status_code = 200
- result = {
- "msg":"Delete successfully.",
- "code":status_code
- }
- else:
- db.session.rollback()
- status_code = 400
- result = {
- "msg":"Delete failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 改
- @user.route('/up_us_data/<id>', methods=['PUT'])
- def up_us_data(id):
- add_us_data = request.get_json()
- user = User.query.get(int(id))
- if user:
- user.email = add_us_data['email']
- user.username = add_us_data['username']
- db.session.commit() # 提交事务以保存更新
- status_code = 200
- result = {
- "msg":"Updated successfully.",
- "code":status_code
- }
- else:
- db.session.rollback()
- result = {
- "msg":"Updated failed! Please check the query data.",
- "code":status_code
- }
- return jsonify(result), status_code
-
- # 查
- @user.route('/get_data_by_us_name', methods=['GET'])
- def get_data_by_us_name():
- us_id = request.args.get("id",type=int)
- us_data = User.query.get(us_id)
- if not us_data:
- status_code = 400
- result = {
- "msg":"Query failed! Please check the query username.",
- "us_data":{},
- "code":status_code
- }
- else:
- us_data_get = {
- "id":us_data.id,
- "username":us_data.username,
- "email":us_data.email
- }
- status_code = 200
- result = {
- "msg":"Query successful.",
- "us_data":us_data_get,
- "code":status_code
- }
- return jsonify(result), status_code
web---user---__init__.py
- from flask import Blueprint
- user = Blueprint('user', __name__)
- from . import views
Flask 是一个用 Python 编写的轻量级 Web 应用框架。它旨在提供一个简单、可扩展的基础,用于构建 Web 服务和 Web 应用程序。以上仅是一些简单使用的例子,能帮助你快上手并跑通。在工作实践还会遇到更多复杂的场景,比如token认证、复杂的sql查询等。如需了解更多关于flask的知识请阅读官方文档。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。