赞
踩
FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 并基于标准的 Python 类型提示。
关键特性:
简单来体验一下:
环境:
pip install fastapi
pip install "uvicorn[standard]"
ASGI(Asynchronous Server Gateway Interface)是 Python 异步服务器网关接口的缩写。
它定义了异步编程风格的 Python Web 服务器与 Web 服务器和应用程序之间的标准接口。
ASGI 旨在解决传统 Python Web 服务器(如WSGI)在高并发和长连接场景下的不足,使用异步编程模型来提高服务器的并发处理能力。
ASGI服务器具有以下几个关键特点:
- 异步和事件驱动:ASGI 服务器采用异步非阻塞的编程模型,可以高效处理大量并发连接,避免进程/线程阻塞。它使用事件循环机制,在有事件到来时处理,否则释放资源做其他事情。
- 支持长连接和WebSocket:ASGI 服务器本身支持长连接和 WebSocket 协议,可用于构建实时通信应用,如在线聊天室、协作平台等。
标准统一接口:ASGI 定义了统一的服务器端接口规范,使Web服务器、中间件和应用程序之间松散耦合,易于拓展和维护。
高性能:相比传统 WSGI,ASGI 编程模型使用异步协程可以大幅降低上下文切换和内存开销,从而提高并发性能。目前支持 ASGI 的服务器有 Uvicorn、Daphne、Hypercorn 等,应用框架包括 Django Channels、FastAPI 等。ASGI 正在逐渐取代 WSGI 成为 Python Web 应用开发的新标准,特别适合于需要高并发、实时双向通信的应用场景。
from typing import Union from fastapi import FastAPI # 创建 FastAPI 实例 app = FastAPI() # 定义路由,处理 get 请求 @app.get("/") # 定义路由处理函数 async def read_root(): return {"Hello": "World"} # 定义路由,处理 get 请求,接收一个路径参数 item_id @app.get("/items/{item_id}") async def read_item( item_id: int, q: str = 'default', ): return {"item_id": item_id, "q": q}
$ uvicorn main:app --reload
INFO: Will watch for changes in these directories: ['I:\\Python\\Web\\FastAPI\\fastApiProject1']
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [14776] using WatchFiles
INFO: Started server process [24944]
INFO: Waiting for application startup.
INFO: Application startup complete.
app = FastAPI()
创建的对象。访问 http://127.0.0.1:8000/ 可以在页面看到路由处理函数返回的数据:
访问 http://127.0.0.1:8000/items/5?q=somequery 可以在页面看到路由处理函数返回的数据:
浏览器访问 http://127.0.0.1:8000/docs 可以到达 API 界面:
但是国内可能打不开。
fastapi访问/docs和/redoc接口文档显示空白或无法加载
FastAPI 自动文档Swagger UI 打不开。显示空白
打开这个页面,我们可以看到非常详细的 API 描述:
这是怎么做到的?回到代码:
@app.get("/items/{item_id}")
def read_item(
item_id: int,
q: str = 'default',
):
return {"item_id": item_id, "q": q}
路由装饰器中定义了路径 /items/{item_id},路径参数 item_id 在路由处理函数中明确定义为 int 数据类型,且未指定默认值,路径参数是用户必须传递的。
路由处理函数中,q 明确定义为 str 数据类型,且赋予了一个默认值 default,这种不是路径参数的路由处理函数参数就是查询参数。
最重要的特征就是,我们为参数添加了类型声明,这样做有非常多好处:
多数情况我们会用 POST(最常用)、PUT 操作来发送数据,具体就是将数据放在这些请求的请求体中。
既然路径参数和查询参数都可以通过定义数据类型来完成默认的类型转化和校验,那么作为传述数据主要载体的请求体,自然也支持数据类型的定义带来的好处,响应体也是如此!
请求体和响应体的数据定义需要使用 pandatic,这是一个流行的数据验证工具。使用 Pydantic 模型声明请求体,能充分利用它的功能和优点:
from fastapi import FastAPI from pydantic import BaseModel class Item(BaseModel): name: str description: str | None = None price: float tax: float | None = None app = FastAPI() @app.post("/items/") async def create_item(item: Item): return item
我们通过使继承 pydantic 的 BaseModel 类来定义了一个类,通常会将它称为模型,在这个模型中我们定义了一些字段及其数据类型,然后我们用这个模型定义一个查询参数的类型。
上述模型声明如下 JSON 对象(即 Python 字典):
{
"name": "Foo",
"description": "An optional description",
"price": 45.2,
"tax": 3.5
}
运行项目,使用接口调试工具进行调试:
如果请求体数据必填项缺失:
发送: { "name": "Foo", "description": "An optional description", "tax": 3.5 } 返回: { "detail": [ { "type": "missing", "loc": [ "body", "price" ], "msg": "Field required", "input": { "name": "Foo", "description": "An optional description", "tax": 3.5 } } ] }
如果数据类型错误:
发送: { "name": "Foo", "description": "An optional description", "price": "error type", "tax": 3.5 } 返回: { "detail": [ { "type": "float_parsing", "loc": [ "body", "price" ], "msg": "Input should be a valid number, unable to parse string as a number", "input": "error type" } ] }
这就是 pandatic 实现请求体数据校验的最大好处!
接下来看看响应体:可以在任意的路径操作中使用 response_model
参数来声明用于响应的模型。
from typing import Any from fastapi import FastAPI from pydantic import BaseModel, EmailStr app = FastAPI() class UserIn(BaseModel): username: str password: str email: EmailStr full_name: str | None = None class UserOut(BaseModel): username: str email: EmailStr full_name: str | None = None @app.post("/user/", response_model=UserOut) async def create_user(user: UserIn) -> Any: return user if __name__ == "__main__": import uvicorn uvicorn.run(app)
即便我们的路径操作函数将会返回包含密码的相同输入用户,但我们已经将 response_model
声明为了不包含密码的 UserOut 模型,FastAPI 将会负责过滤掉未在输出模型中声明的所有数据。
发送: { "username": "Jonas Vingegaard", "password": "123456", "email": "test@test.com", "full_name": "Jonas Vingegaard Rasmussen" } 返回: { "username": "Jonas Vingegaard", "email": "test@test.com", "full_name": "Jonas Vingegaard Rasmussen" } 发送: { "username": "Jonas Vingegaard", "password": "123456", "email": "test@test.com" } 返回: { "username": "Jonas Vingegaard", "email": "test@test.com", "full_name": null } 发送: { "username": "Jonas Vingegaard", "password": "123456" } 返回: { "detail": [ { "type": "missing", "loc": [ "body", "email" ], "msg": "Field required", "input": { "username": "Jonas Vingegaard", "password": "123456" } } ] }
任何SQLAlchemy支持的数据库,如:
首先你需要安装SQLAlchemy:
pip install sqlalchemy
创建目录:
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
├── models.py
└── schemas.py
定义数据库:
# sql_app/database.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 为 SQLAlchemy 定义数据库 URL地址 SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db" # 创建 SQLAlchemy 引擎 engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) # 创建一个SessionLocal类 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 创建一个Base类 Base = declarative_base()
创建数据库模型:
# sql_app/models.py from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sql_app.database import Base class User(Base): __tablename__ = "users" __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True) email = Column(String, unique=True) hashed_password = Column(String) is_active = Column(Boolean, default=True)
创建 Pydantic 模型:
# sql_app/schemas.py from pydantic import BaseModel class UserBase(BaseModel): email: str class UserCreate(UserBase): password: str class User(UserBase): id: int is_active: bool class Config: orm_mode = True
主程序——controller:
# main.py from fastapi import Depends, FastAPI, HTTPException from sqlalchemy.orm import Session from sql_app import crud, models, schemas from sql_app.database import SessionLocal, engine models.Base.metadata.create_all(bind=engine) app = FastAPI() # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user) @app.get("/users/", response_model=list[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user if __name__ == "__main__": import uvicorn uvicorn.run(app)
添加 crud 操作实现:
# crud.py from sqlalchemy.orm import Session from sql_app import models, schemas def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(models.User).offset(skip).limit(limit).all() def create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user
去启动程序,访问 API 页面:
POST 创建一些数据,然后就能 GET 它们了。
简单的文件上传:
from urllib.parse import quote from fastapi import FastAPI, UploadFile from fastapi.responses import StreamingResponse app = FastAPI() @app.post("/uploadfile/") async def create_upload_file(file: UploadFile): # 分块将大文件保存到磁盘 file_object = file.file file_name = file.filename # 判断文件夹是否存在,不存在则创建 import os if not os.path.exists("files"): os.mkdir("files") # 将文件写入到文件夹 with open(f"files/{file_name}", "wb") as f: while True: # 读取文件 chunk = file_object.read(10000) if not chunk: break f.write(chunk) return {"filename": file.filename} if __name__ == "__main__": import uvicorn uvicorn.run(app)
大文件下载:
def file_iterator(file_path, chunk_size=8192): with open(file_path, mode="rb") as file_like: while True: chunk = file_like.read(chunk_size) if not chunk: break yield chunk # 流式下载文件 @app.get("/downloadfile/") async def download_file(file_name: str): file_path = f"files/{file_name}" # 判断文件是否存在 import os if not os.path.exists(file_path): return {"msg": "file not found"} else: headers = { 'Content-Disposition': f'attachment; filename="{quote(file_name)}"' } return StreamingResponse( file_iterator(file_path), media_type="application/octet-stream", headers=headers )
先收集项目依赖:
pip freeze > requirements.txt
然后将项目发送到服务器。
在服务器安装 ASGI 服务器:
pip install "uvicorn[standard]"
然后运行服务器程序:
uvicorn main:app --host 0.0.0.0 --port 80
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。