FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 并基于标准的 Python 类型提示。
pip install fastapi
pip install "uvicorn[standard]"
ASGI(Asynchronous Server Gateway Interface)是 Python 异步服务器网关接口的缩写。
它定义了异步编程风格的 Python Web 服务器与 Web 服务器和应用程序之间的标准接口。
ASGI 旨在解决传统 Python Web 服务器(如WSGI)在高并发和长连接场景下的不足,使用异步编程模型来提高服务器的并发处理能力。
- 异步和事件驱动:ASGI 服务器采用异步非阻塞的编程模型,可以高效处理大量并发连接,避免进程/线程阻塞。它使用事件循环机制,在有事件到来时处理,否则释放资源做其他事情。
- 支持长连接和WebSocket:ASGI 服务器本身支持长连接和 WebSocket 协议,可用于构建实时通信应用,如在线聊天室、协作平台等。
标准统一接口:ASGI 定义了统一的服务器端接口规范,使Web服务器、中间件和应用程序之间松散耦合,易于拓展和维护。
高性能:相比传统 WSGI,ASGI 编程模型使用异步协程可以大幅降低上下文切换和内存开销,从而提高并发性能。目前支持 ASGI 的服务器有 Uvicorn、Daphne、Hypercorn 等,应用框架包括 Django Channels、FastAPI 等。ASGI 正在逐渐取代 WSGI 成为 Python Web 应用开发的新标准,特别适合于需要高并发、实时双向通信的应用场景。
from typing import Union from fastapi import FastAPI # 创建 FastAPI 实例 app = FastAPI() # 定义路由,处理 get 请求 @app.get("/") # 定义路由处理函数 async def read_root(): return {"Hello": "World"} # 定义路由,处理 get 请求,接收一个路径参数 item_id @app.get("/items/{item_id}") async def read_item( item_id: int, q: str = 'default', ): return {"item_id": item_id, "q": q}
$ uvicorn main:app --reload
INFO: Will watch for changes in these directories: ['I:\\Python\\Web\\FastAPI\\fastApiProject1']
INFO: Uvicorn running on (Press CTRL+C to quit)
INFO: Started reloader process [14776] using WatchFiles
INFO: Started server process [24944]
INFO: Waiting for application startup.
INFO: Application startup complete.
app = FastAPI()
创建的对象。访问 可以在页面看到路由处理函数返回的数据:
访问 可以在页面看到路由处理函数返回的数据:
浏览器访问 可以到达 API 界面:
FastAPI 自动文档Swagger UI 打不开。显示空白
打开这个页面,我们可以看到非常详细的 API 描述:
def read_item(
item_id: int,
q: str = 'default',
return {"item_id": item_id, "q": q}
路由装饰器中定义了路径 /items/{item_id},路径参数 item_id 在路由处理函数中明确定义为 int 数据类型,且未指定默认值,路径参数是用户必须传递的。
路由处理函数中,q 明确定义为 str 数据类型,且赋予了一个默认值 default,这种不是路径参数的路由处理函数参数就是查询参数。
多数情况我们会用 POST(最常用)、PUT 操作来发送数据,具体就是将数据放在这些请求的请求体中。
请求体和响应体的数据定义需要使用 pandatic,这是一个流行的数据验证工具。使用 Pydantic 模型声明请求体,能充分利用它的功能和优点:
from fastapi import FastAPI from pydantic import BaseModel class Item(BaseModel): name: str description: str | None = None price: float tax: float | None = None app = FastAPI() @app.post("/items/") async def create_item(item: Item): return item
我们通过使继承 pydantic 的 BaseModel 类来定义了一个类,通常会将它称为模型,在这个模型中我们定义了一些字段及其数据类型,然后我们用这个模型定义一个查询参数的类型。
上述模型声明如下 JSON 对象(即 Python 字典):
"name": "Foo",
"description": "An optional description",
"price": 45.2,
"tax": 3.5
发送: { "name": "Foo", "description": "An optional description", "tax": 3.5 } 返回: { "detail": [ { "type": "missing", "loc": [ "body", "price" ], "msg": "Field required", "input": { "name": "Foo", "description": "An optional description", "tax": 3.5 } } ] }
发送: { "name": "Foo", "description": "An optional description", "price": "error type", "tax": 3.5 } 返回: { "detail": [ { "type": "float_parsing", "loc": [ "body", "price" ], "msg": "Input should be a valid number, unable to parse string as a number", "input": "error type" } ] }
这就是 pandatic 实现请求体数据校验的最大好处!
接下来看看响应体:可以在任意的路径操作中使用 response_model
from typing import Any from fastapi import FastAPI from pydantic import BaseModel, EmailStr app = FastAPI() class UserIn(BaseModel): username: str password: str email: EmailStr full_name: str | None = None class UserOut(BaseModel): username: str email: EmailStr full_name: str | None = None @app.post("/user/", response_model=UserOut) async def create_user(user: UserIn) -> Any: return user if __name__ == "__main__": import uvicorn uvicorn.run(app)
即便我们的路径操作函数将会返回包含密码的相同输入用户,但我们已经将 response_model
声明为了不包含密码的 UserOut 模型,FastAPI 将会负责过滤掉未在输出模型中声明的所有数据。
发送: { "username": "Jonas Vingegaard", "password": "123456", "email": "test@test.com", "full_name": "Jonas Vingegaard Rasmussen" } 返回: { "username": "Jonas Vingegaard", "email": "test@test.com", "full_name": "Jonas Vingegaard Rasmussen" } 发送: { "username": "Jonas Vingegaard", "password": "123456", "email": "test@test.com" } 返回: { "username": "Jonas Vingegaard", "email": "test@test.com", "full_name": null } 发送: { "username": "Jonas Vingegaard", "password": "123456" } 返回: { "detail": [ { "type": "missing", "loc": [ "body", "email" ], "msg": "Field required", "input": { "username": "Jonas Vingegaard", "password": "123456" } } ] }
pip install sqlalchemy
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
├── models.py
└── schemas.py
# sql_app/database.py from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 为 SQLAlchemy 定义数据库 URL地址 SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db" # 创建 SQLAlchemy 引擎 engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) # 创建一个SessionLocal类 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 创建一个Base类 Base = declarative_base()
# sql_app/models.py from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sql_app.database import Base class User(Base): __tablename__ = "users" __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True) email = Column(String, unique=True) hashed_password = Column(String) is_active = Column(Boolean, default=True)
创建 Pydantic 模型:
# sql_app/schemas.py from pydantic import BaseModel class UserBase(BaseModel): email: str class UserCreate(UserBase): password: str class User(UserBase): id: int is_active: bool class Config: orm_mode = True
# main.py from fastapi import Depends, FastAPI, HTTPException from sqlalchemy.orm import Session from sql_app import crud, models, schemas from sql_app.database import SessionLocal, engine models.Base.metadata.create_all(bind=engine) app = FastAPI() # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user) @app.get("/users/", response_model=list[schemas.User]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) return users @app.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user if __name__ == "__main__": import uvicorn uvicorn.run(app)
添加 crud 操作实现:
# crud.py from sqlalchemy.orm import Session from sql_app import models, schemas def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(models.User).offset(skip).limit(limit).all() def create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_user
去启动程序,访问 API 页面:
POST 创建一些数据,然后就能 GET 它们了。
from urllib.parse import quote from fastapi import FastAPI, UploadFile from fastapi.responses import StreamingResponse app = FastAPI() @app.post("/uploadfile/") async def create_upload_file(file: UploadFile): # 分块将大文件保存到磁盘 file_object = file.file file_name = file.filename # 判断文件夹是否存在,不存在则创建 import os if not os.path.exists("files"): os.mkdir("files") # 将文件写入到文件夹 with open(f"files/{file_name}", "wb") as f: while True: # 读取文件 chunk = file_object.read(10000) if not chunk: break f.write(chunk) return {"filename": file.filename} if __name__ == "__main__": import uvicorn uvicorn.run(app)
def file_iterator(file_path, chunk_size=8192): with open(file_path, mode="rb") as file_like: while True: chunk = file_like.read(chunk_size) if not chunk: break yield chunk # 流式下载文件 @app.get("/downloadfile/") async def download_file(file_name: str): file_path = f"files/{file_name}" # 判断文件是否存在 import os if not os.path.exists(file_path): return {"msg": "file not found"} else: headers = { 'Content-Disposition': f'attachment; filename="{quote(file_name)}"' } return StreamingResponse( file_iterator(file_path), media_type="application/octet-stream", headers=headers )
pip freeze > requirements.txt
在服务器安装 ASGI 服务器:
pip install "uvicorn[standard]"
uvicorn main:app --host --port 80
