响应状态码
在@app.post()
方法中添加status_code参数:
- from fastapi import FastAPI
-
- app = FastAPI()
-
-
- @app.post("/items/", status_code=201)
- async def create_item(name: str):
- return {"name": name}
status_code也可以是IntEnum
,比如Python的http.HTTPStatus
。
常见响应状态码:
-
100以上,信息;很少直接使用;
-
200以上,成功;200是OK,201是Created,204是No Content;
-
300以上,重定向;304是Not Modified;
-
400以上,客户端错误;404是Not Found;
-
500以上,服务器错误;
FastAPI引入了status,可以方便的录入这些状态:
- from fastapi import FastAPI, status
-
- app = FastAPI()
-
-
- @app.post("/items/", status_code=status.HTTP_201_CREATED)
- async def create_item(name: str):
- return {"name": name}
表单数据
为了使用表单,首先需要安装python-multipart:
pip install python-multipart
示例:
- from fastapi import FastAPI, Form
-
- app = FastAPI()
-
-
- @app.post("/login/")
- async def login(username: str = Form(), password: str = Form()):
- return {"username": username}
表单由HTML中的<form></form>
发送,请求头的content-type一般是application/x-www-form-urlencoded
,当为文件时multipart/form-data
。
请求文件
示例:
- from fastapi import FastAPI, File, UploadFile
-
- app = FastAPI()
-
-
- @app.post("/files/")
- async def create_file(file: bytes = File()):
- return {"file_size": len(file)}
-
-
- @app.post("/uploadfile/")
- async def create_upload_file(file: UploadFile):
- return {"filename": file.filename}
create_file()
的类型为bytes,接收到的文件内容也是bytes,数据都存在于内存中,适用于小文件。create_upload_file()
的类型为UploadFile,它会在内存设置一个最大存储,超出最大存储,就会把数据转存到磁盘,适用于大文件。
UploadFile有以下属性:
-
filename,文件名,比如myimage.jpg;
-
content_type,文件类型,比如image/jpeg;
-
file,SpooledTemporaryFile实例,一个file-like对象。
UploadFile有以下方法:
-
write(data):写数据(str或bytes)到文件;
-
read(size):从文件读size(int)大小的bytes或character;
-
seek(offset):定位到文件中offset(int)的位置,比如
await myfile.seek(0)
会定位到文件开始; -
close():关闭文件;
所有这些方法都是async的,需要await:
contents = await myfile.read()
不想await就使用其中的file对象:
contents = myfile.file.read()
文件可选非必传:
- from typing import Union
-
- from fastapi import FastAPI, File, UploadFile
-
- app = FastAPI()
-
-
- @app.post("/files/")
- async def create_file(file: Union[bytes, None] = File(default=None)):
- if not file:
- return {"message": "No file sent"}
- else:
- return {"file_size": len(file)}
-
-
- @app.post("/uploadfile/")
- async def create_upload_file(file: Union[UploadFile, None] = None):
- if not file:
- return {"message": "No upload file sent"}
- else:
- return {"filename": file.filename}
UploadFile的metadata:
- from fastapi import FastAPI, File, UploadFile
-
- app = FastAPI()
-
-
- @app.post("/files/")
- async def create_file(file: bytes = File(description="A file read as bytes")):
- return {"file_size": len(file)}
-
-
- @app.post("/uploadfile/")
- async def create_upload_file(
- file: UploadFile = File(description="A file read as UploadFile"),
- ):
- return {"filename": file.filename}
多文件上传:
- from typing import List
-
- from fastapi import FastAPI, File, UploadFile
- from fastapi.responses import HTMLResponse
-
- app = FastAPI()
-
-
- @app.post("/files/")
- async def create_files(files: List[bytes] = File()):
- return {"file_sizes": [len(file) for file in files]}
-
-
- @app.post("/uploadfiles/")
- async def create_upload_files(files: List[UploadFile]):
- return {"filenames": [file.filename for file in files]}
-
-
- @app.get("/")
- async def main():
- content = """
- <body>
- <form action="/files/" enctype="multipart/form-data" method="post">
- <input name="files" type="file" multiple>
- <input type="submit">
- </form>
- <form action="/uploadfiles/" enctype="multipart/form-data" method="post">
- <input name="files" type="file" multiple>
- <input type="submit">
- </form>
- </body>
- """
- return HTMLResponse(content=content)
同时请求表单和文件
示例:
- from fastapi import FastAPI, File, Form, UploadFile
-
- app = FastAPI()
-
-
- @app.post("/files/")
- async def create_file(
- file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
- ):
- return {
- "file_size": len(file),
- "token": token,
- "fileb_content_type": fileb.content_type,
- }
错误处理
FastAPI提供了HTTPException:
- from fastapi import FastAPI, HTTPException
-
- app = FastAPI()
-
- items = {"foo": "The Foo Wrestlers"}
-
-
- @app.get("/items/{item_id}")
- async def read_item(item_id: str):
- if item_id not in items:
- raise HTTPException(status_code=404, detail="Item not found")
- return {"item": items[item_id]}
HTTPException不是return而是raise的,抛出异常。
对于抛出的异常,可以使用@app.exception_handler
自定义handler进行处理:
- from fastapi import FastAPI, Request
- from fastapi.responses import JSONResponse
-
-
- class UnicornException(Exception):
- def __init__(self, name: str):
- self.name = name
-
-
- app = FastAPI()
-
-
- @app.exception_handler(UnicornException)
- async def unicorn_exception_handler(request: Request, exc: UnicornException):
- return JSONResponse(
- status_code=418,
- content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
- )
-
-
- @app.get("/unicorns/{name}")
- async def read_unicorn(name: str):
- if name == "yolo":
- raise UnicornException(name=name)
- return {"unicorn_name": name}
在抛出HTTPException异常时,FastAPI有很多默认的handler,比如RequestValidationError,可以使用此方法重写默认的handler:
- from fastapi import FastAPI, HTTPException
- from fastapi.exceptions import RequestValidationError
- from fastapi.responses import PlainTextResponse
- from starlette.exceptions import HTTPException as StarletteHTTPException
-
- app = FastAPI()
-
-
- @app.exception_handler(StarletteHTTPException)
- async def http_exception_handler(request, exc):
- return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
-
-
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(request, exc):
- return PlainTextResponse(str(exc), status_code=400)
-
-
- @app.get("/items/{item_id}")
- async def read_item(item_id: int):
- if item_id == 3:
- raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
- return {"item_id": item_id}
默认handler会返回:
- {
- "detail": [
- {
- "loc": [
- "path",
- "item_id"
- ],
- "msg": "value is not a valid integer",
- "type": "type_error.integer"
- }
- ]
- }
而重写handler后会返回字符串:
- 1 validation error
- path -> item_id
- value is not a valid integer (type=type_error.integer)
如果不想改动默认handler,只是补充点信息,可以导入http_exception_handler和request_validation_exception_handler:
- from fastapi import FastAPI, HTTPException
- from fastapi.exception_handlers import (
- http_exception_handler,
- request_validation_exception_handler,
- )
- from fastapi.exceptions import RequestValidationError
- from starlette.exceptions import HTTPException as StarletteHTTPException
-
- app = FastAPI()
-
-
- @app.exception_handler(StarletteHTTPException)
- async def custom_http_exception_handler(request, exc):
- print(f"OMG! An HTTP error!: {repr(exc)}")
- return await http_exception_handler(request, exc)
-
-
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(request, exc):
- print(f"OMG! The client sent invalid data!: {exc}")
- return await request_validation_exception_handler(request, exc)
-
-
- @app.get("/items/{item_id}")
- async def read_item(item_id: int):
- if item_id == 3:
- raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
- return {"item_id": item_id}
路径操作配置
响应状态码:
- from typing import Set, Union
-
- from fastapi import FastAPI, status
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: str
- description: Union[str, None] = None
- price: float
- tax: Union[float, None] = None
- tags: Set[str] = set()
-
-
- @app.post("/items/", response_model=Item, status_code=status.HTTP_201_CREATED)
- async def create_item(item: Item):
- return item
标签:
- from typing import Set, Union
-
- from fastapi import FastAPI
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: str
- description: Union[str, None] = None
- price: float
- tax: Union[float, None] = None
- tags: Set[str] = set()
-
-
- @app.post("/items/", response_model=Item, tags=["items"])
- async def create_item(item: Item):
- return item
-
-
- @app.get("/items/", tags=["items"])
- async def read_items():
- return [{"name": "Foo", "price": 42}]
-
-
- @app.get("/users/", tags=["users"])
- async def read_users():
- return [{"username": "johndoe"}]
标签枚举:
- from enum import Enum
-
- from fastapi import FastAPI
-
- app = FastAPI()
-
-
- class Tags(Enum):
- items = "items"
- users = "users"
-
-
- @app.get("/items/", tags=[Tags.items])
- async def get_items():
- return ["Portal gun", "Plumbus"]
-
-
- @app.get("/users/", tags=[Tags.users])
- async def read_users():
- return ["Rick", "Morty"]
概要和描述:
- from typing import Set, Union
-
- from fastapi import FastAPI
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: str
- description: Union[str, None] = None
- price: float
- tax: Union[float, None] = None
- tags: Set[str] = set()
-
-
- @app.post(
- "/items/",
- response_model=Item,
- summary="Create an item",
- description="Create an item with all the information, name, description, price, tax and a set of unique tags",
- )
- async def create_item(item: Item):
- return item
文档字符串:
- from typing import Set, Union
-
- from fastapi import FastAPI
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: str
- description: Union[str, None] = None
- price: float
- tax: Union[float, None] = None
- tags: Set[str] = set()
-
-
- @app.post("/items/", response_model=Item, summary="Create an item")
- async def create_item(item: Item):
- """
- Create an item with all the information:
- - **name**: each item must have a name
- - **description**: a long description
- - **price**: required
- - **tax**: if the item doesn't have tax, you can omit this
- - **tags**: a set of unique tag strings for this item
- """
- return item
响应描述:
- from typing import Set, Union
-
- from fastapi import FastAPI
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: str
- description: Union[str, None] = None
- price: float
- tax: Union[float, None] = None
- tags: Set[str] = set()
-
-
- @app.post(
- "/items/",
- response_model=Item,
- summary="Create an item",
- response_description="The created item",
- )
- async def create_item(item: Item):
- """
- Create an item with all the information:
- - **name**: each item must have a name
- - **description**: a long description
- - **price**: required
- - **tax**: if the item doesn't have tax, you can omit this
- - **tags**: a set of unique tag strings for this item
- """
- return item
标记为deprecated:
- from fastapi import FastAPI
-
- app = FastAPI()
-
-
- @app.get("/items/", tags=["items"])
- async def read_items():
- return [{"name": "Foo", "price": 42}]
-
-
- @app.get("/users/", tags=["users"])
- async def read_users():
- return [{"username": "johndoe"}]
-
-
- @app.get("/elements/", tags=["items"], deprecated=True)
- async def read_elements():
- return [{"item_id": "Foo"}]
JSON兼容编码器
jsonable_encoder()
函数的作用是把Pydantic model转换成JSON兼容的类型比如dict、list等。
- from datetime import datetime
- from typing import Union
-
- from fastapi import FastAPI
- from fastapi.encoders import jsonable_encoder
- from pydantic import BaseModel
-
- fake_db = {}
-
-
- class Item(BaseModel):
- title: str
- timestamp: datetime
- description: Union[str, None] = None
-
-
- app = FastAPI()
-
-
- @app.put("/items/{id}")
- def update_item(id: str, item: Item):
- json_compatible_item_data = jsonable_encoder(item)
- fake_db[id] = json_compatible_item_data
Body-更新
使用PUT:
- from typing import List, Union
-
- from fastapi import FastAPI
- from fastapi.encoders import jsonable_encoder
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: Union[str, None] = None
- description: Union[str, None] = None
- price: Union[float, None] = None
- tax: float = 10.5
- tags: List[str] = []
-
-
- items = {
- "foo": {"name": "Foo", "price": 50.2},
- "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
- "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
- }
-
-
- @app.get("/items/{item_id}", response_model=Item)
- async def read_item(item_id: str):
- return items[item_id]
-
-
- @app.put("/items/{item_id}", response_model=Item)
- async def update_item(item_id: str, item: Item):
- update_item_encoded = jsonable_encoder(item)
- items[item_id] = update_item_encoded
- return update_item_encoded
输入数据使用了jsonable_encoder()
函数转换为JSON兼容类型。
使用PATCH:
exclude_unset=True
- from typing import List, Union
-
- from fastapi import FastAPI
- from fastapi.encoders import jsonable_encoder
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: Union[str, None] = None
- description: Union[str, None] = None
- price: Union[float, None] = None
- tax: float = 10.5
- tags: List[str] = []
-
-
- items = {
- "foo": {"name": "Foo", "price": 50.2},
- "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
- "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
- }
-
-
- @app.get("/items/{item_id}", response_model=Item)
- async def read_item(item_id: str):
- return items[item_id]
-
-
- @app.patch("/items/{item_id}", response_model=Item)
- async def update_item(item_id: str, item: Item):
- stored_item_data = items[item_id]
- stored_item_model = Item(**stored_item_data)
- update_data = item.dict(exclude_unset=True)
- updated_item = stored_item_model.copy(update=update_data)
- items[item_id] = jsonable_encoder(updated_item)
- return updated_item
.copy(update=update_data)
- from typing import List, Union
-
- from fastapi import FastAPI
- from fastapi.encoders import jsonable_encoder
- from pydantic import BaseModel
-
- app = FastAPI()
-
-
- class Item(BaseModel):
- name: Union[str, None] = None
- description: Union[str, None] = None
- price: Union[float, None] = None
- tax: float = 10.5
- tags: List[str] = []
-
-
- items = {
- "foo": {"name": "Foo", "price": 50.2},
- "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
- "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
- }
-
-
- @app.get("/items/{item_id}", response_model=Item)
- async def read_item(item_id: str):
- return items[item_id]
-
-
- @app.patch("/items/{item_id}", response_model=Item)
- async def update_item(item_id: str, item: Item):
- stored_item_data = items[item_id]
- stored_item_model = Item(**stored_item_data)
- update_data = item.dict(exclude_unset=True)
- updated_item = stored_item_model.copy(update=update_data)
- items[item_id] = jsonable_encoder(updated_item)
- return updated_item
PUT和PATCH都可以用来部分更新,PUT用的更多。
依赖
什么是依赖注入?在FastAPI里面,你可以在路径操作函数中添加依赖的声明,然后FastAPI会自动加载这些依赖。
依赖注入的好处有:
- 复用代码;
- 复用数据库连接;
- 增强安全、认证、角色;
- 等等等;
依赖注入示例:
- from typing import Union
-
- from fastapi import Depends, FastAPI
-
- app = FastAPI()
-
-
- async def common_parameters(
- q: Union[str, None] = None, skip: int = 0, limit: int = 100
- ):
- return {"q": q, "skip": skip, "limit": limit}
-
-
- @app.get("/items/")
- async def read_items(commons: dict = Depends(common_parameters)):
- return commons
-
-
- @app.get("/users/")
- async def read_users(commons: dict = Depends(common_parameters)):
- return commons
common_parameters()
函数是个简单的依赖;- Depends引入依赖;
FastAPI就会自动调用common_parameters()
函数并把结果返回给commons,而无需任何其他代码。
依赖也可以使用class,把common_parameters()
函数改为CommonQueryParams
类:
- from typing import Union
-
- from fastapi import Depends, FastAPI
-
- app = FastAPI()
-
-
- fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
-
-
- class CommonQueryParams:
- def __init__(self, q: Union[str, None] = None, skip: int = 0, limit: int = 100):
- self.q = q
- self.skip = skip
- self.limit = limit
-
-
- @app.get("/items/")
- async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
- response = {}
- if commons.q:
- response.update({"q": commons.q})
- items = fake_items_db[commons.skip : commons.skip + commons.limit]
- response.update({"items": items})
- return response
Depends会创建一个CommonQueryParams的实例然后赋值给commons。
更一步简化,只写1次CommonQueryParams:
commons: CommonQueryParams = Depends()
Depends()
里面的CommonQueryParams可以省略掉。
FastAPI支持子依赖,也就是Depends嵌套:
- from typing import Union
-
- from fastapi import Cookie, Depends, FastAPI
-
- app = FastAPI()
-
-
- def query_extractor(q: Union[str, None] = None):
- return q
-
-
- def query_or_cookie_extractor(
- q: str = Depends(query_extractor),
- last_query: Union[str, None] = Cookie(default=None),
- ):
- if not q:
- return last_query
- return q
-
-
- @app.get("/items/")
- async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)):
- return {"q_or_cookie": query_or_default}
如果使用同一个依赖多次,FastAPI默认会只注入一次。可以按以下设置让FastAPI注入多次:
- async def needy_dependency(fresh_value: str = Depends(get_value, use_cache=False)):
- return {"fresh_value": fresh_value}
多个依赖可以用dependencies
的list:
- from fastapi import Depends, FastAPI, Header, HTTPException
-
- app = FastAPI()
-
-
- async def verify_token(x_token: str = Header()):
- if x_token != "fake-super-secret-token":
- raise HTTPException(status_code=400, detail="X-Token header invalid")
-
-
- async def verify_key(x_key: str = Header()):
- if x_key != "fake-super-secret-key":
- raise HTTPException(status_code=400, detail="X-Key header invalid")
- return x_key
-
-
- @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])
- async def read_items():
- return [{"item": "Foo"}, {"item": "Bar"}]
如果给FastAPI的构造函数传入dependencies
,那么就是全局依赖:
- from fastapi import Depends, FastAPI, Header, HTTPException
-
-
- async def verify_token(x_token: str = Header()):
- if x_token != "fake-super-secret-token":
- raise HTTPException(status_code=400, detail="X-Token header invalid")
-
-
- async def verify_key(x_key: str = Header()):
- if x_key != "fake-super-secret-key":
- raise HTTPException(status_code=400, detail="X-Key header invalid")
- return x_key
-
-
- app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])
-
-
- @app.get("/items/")
- async def read_items():
- return [{"item": "Portal Gun"}, {"item": "Plumbus"}]
-
-
- @app.get("/users/")
- async def read_users():
- return [{"username": "Rick"}, {"username": "Morty"}]
如果在依赖函数中使用yield,它后面的代码就相当于teardown,这点用法跟pytest的fixture类似:
- async def get_db():
- db = DBSession()
- try:
- yield db
- finally:
- db.close()
另外,借助yield和with可以创建一个上下文管理器(实现__enter__
和__exit__
):
- class MySuperContextManager:
- def __init__(self):
- self.db = DBSession()
-
- def __enter__(self):
- return self.db
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.db.close()
-
-
- async def get_db():
- with MySuperContextManager() as db:
- yield db
安全
FastAPI支持OAuth2协议:
- from fastapi import Depends, FastAPI
- from fastapi.security import OAuth2PasswordBearer
-
- app = FastAPI()
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
-
- @app.get("/items/")
- async def read_items(token: str = Depends(oauth2_scheme)):
- return {"token": token}
备注:需要提前安装 pip install python-multipart
,因为OAuth2使用表单来发送username和password。虽然这个接口已经加上鉴权了。但这些入参都没有生效,因为我们还没有添加相应的处理代码。为了让鉴权实际生效,我们继续添加代码:
- from typing import Union
-
- from fastapi import Depends, FastAPI
- from fastapi.security import OAuth2PasswordBearer
- from pydantic import BaseModel
-
- app = FastAPI()
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
-
- class User(BaseModel):
- username: str
- email: Union[str, None] = None
- full_name: Union[str, None] = None
- disabled: Union[bool, None] = None
-
-
- def fake_decode_token(token):
- return User(
- username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
- )
-
-
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- user = fake_decode_token(token)
- return user
-
-
- @app.get("/users/me")
- async def read_users_me(current_user: User = Depends(get_current_user)):
- return current_user
- 定义模型User;
- 创建依赖get_current_user;
- fake_decode_token接收token,返回模拟的假用户;
- read_users_me注入依赖;
然后实现username和password:
- from typing import Union
-
- from fastapi import Depends, FastAPI, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from pydantic import BaseModel
-
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": "fakehashedsecret",
- "disabled": False,
- },
- "alice": {
- "username": "alice",
- "full_name": "Alice Wonderson",
- "email": "alice@example.com",
- "hashed_password": "fakehashedsecret2",
- "disabled": True,
- },
- }
-
- app = FastAPI()
-
-
- def fake_hash_password(password: str):
- return "fakehashed" + password
-
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
-
- class User(BaseModel):
- username: str
- email: Union[str, None] = None
- full_name: Union[str, None] = None
- disabled: Union[bool, None] = None
-
-
- class UserInDB(User):
- hashed_password: str
-
-
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
-
-
- def fake_decode_token(token):
- # This doesn't provide any security at all
- # Check the next version
- user = get_user(fake_users_db, token)
- return user
-
-
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- user = fake_decode_token(token)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return user
-
-
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
-
-
- @app.post("/token")
- async def login(form_data: OAuth2PasswordRequestForm = Depends()):
- user_dict = fake_users_db.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="Incorrect username or password")
- user = UserInDB(**user_dict)
- hashed_password = fake_hash_password(form_data.password)
- if not hashed_password == user.hashed_password:
- raise HTTPException(status_code=400, detail="Incorrect username or password")
-
- return {"access_token": user.username, "token_type": "bearer"}
-
-
- @app.get("/users/me")
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
现在就可以测试一下了http://127.0.0.1:8000/docs:
授权以后:
访问/users/me
会返回:
- {
- "username": "johndoe",
- "email": "johndoe@example.com",
- "full_name": "John Doe",
- "disabled": false,
- "hashed_password": "fakehashedsecret"
- }
如果logout再访问会出现:
- {
- "detail": "Not authenticated"
- }
输入错误的用户会出现:
- {
- "detail": "Inactive user"
- }
如果想使用JWT,那么先安装python-jose
。为了给密码加密,需要安装passlib
。
示例代码:
- from datetime import datetime, timedelta
- from typing import Union
-
- from fastapi import Depends, FastAPI, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from pydantic import BaseModel
-
- # to get a string like this run:
- # openssl rand -hex 32
- SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
-
-
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
- "disabled": False,
- }
- }
-
-
- class Token(BaseModel):
- access_token: str
- token_type: str
-
-
- class TokenData(BaseModel):
- username: Union[str, None] = None
-
-
- class User(BaseModel):
- username: str
- email: Union[str, None] = None
- full_name: Union[str, None] = None
- disabled: Union[bool, None] = None
-
-
- class UserInDB(User):
- hashed_password: str
-
-
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
-
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
-
- app = FastAPI()
-
-
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
-
-
- def get_password_hash(password):
- return pwd_context.hash(password)
-
-
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
-
-
- def authenticate_user(fake_db, username: str, password: str):
- user = get_user(fake_db, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
-
-
- def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
-
-
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username)
- if user is None:
- raise credentials_exception
- return user
-
-
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
-
-
- @app.post("/token", response_model=Token)
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- user = authenticate_user(fake_users_db, form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
-
-
- @app.get("/users/me/", response_model=User)
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
-
-
- @app.get("/users/me/items/")
- async def read_own_items(current_user: User = Depends(get_current_active_user)):
- return [{"item_id": "Foo", "owner": current_user.username}]
其中的SECRET_KEY通过openssl生成:
openssl rand -hex 32
中间件
FastAPI这里的中间件,指的是一个函数,它在请求处理前被调用,在响应返回前调用。有点类似于Spring的过滤器filter。
创建中间件:
- import time
-
- from fastapi import FastAPI, Request
-
- app = FastAPI()
-
-
- @app.middleware("http")
- async def add_process_time_header(request: Request, call_next):
- start_time = time.time()
- response = await call_next(request)
- process_time = time.time() - start_time
- response.headers["X-Process-Time"] = str(process_time)
- return response
CORS
Cross-Origin Resource Sharing,跨域访问。
同域包括协议、域名、端口,以下均是不同域:
http://localhost
https://localhost
http://localhost:8080
使用CORSMiddleware可以实现跨域访问:
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
-
- app = FastAPI()
-
- origins = [
- "http://localhost.tiangolo.com",
- "https://localhost.tiangolo.com",
- "http://localhost",
- "http://localhost:8080",
- ]
-
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
-
-
- @app.get("/")
- async def main():
- return {"message": "Hello World"}
- allow_origins,允许域名,
[*]
代表所有; - allow_origin_regex,允许域名的正则匹配;
- allow_methods,允许请求方法,
[*]
代表所有; - allow_headers,允许请求头,
[*]
代表所有; - allow_credentials,跨域访问时是否需要cookie,默认False,设置为True时allow_origins不能设置为
[*]
; - expose_headers,暴露给浏览器的响应头,默认
[]
; - max_age,浏览器最大缓存CORS 响应的时间,默认60s;
SQL关系型数据库
官方教程使用的是SQLAlchemy。
示例:
- .
- └── sql_app
- ├── __init__.py
- ├── crud.py
- ├── database.py
- ├── main.py
- ├── models.py
- └── schemas.py
安装:
pip install sqlalchemy
创建数据库:
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
-
- SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
- # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
-
- engine = create_engine(
- SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
- )
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
-
- Base = declarative_base()
创建数据库模型:
- from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
- from sqlalchemy.orm import relationship
-
- from .database import Base
-
-
- class User(Base):
- __tablename__ = "users"
-
- id = Column(Integer, primary_key=True, index=True)
- email = Column(String, unique=True, index=True)
- hashed_password = Column(String)
- is_active = Column(Boolean, default=True)
-
- items = relationship("Item", back_populates="owner")
-
-
- class Item(Base):
- __tablename__ = "items"
-
- id = Column(Integer, primary_key=True, index=True)
- title = Column(String, index=True)
- description = Column(String, index=True)
- owner_id = Column(Integer, ForeignKey("users.id"))
-
- owner = relationship("User", back_populates="items")
创建Pydantic模型:
- from typing import List, Union
-
- from pydantic import BaseModel
-
-
- class ItemBase(BaseModel):
- title: str
- description: Union[str, None] = None
-
-
- class ItemCreate(ItemBase):
- pass
-
-
- class Item(ItemBase):
- id: int
- owner_id: int
-
- class Config:
- orm_mode = True
-
-
- class UserBase(BaseModel):
- email: str
-
-
- class UserCreate(UserBase):
- password: str
-
-
- class User(UserBase):
- id: int
- is_active: bool
- items: List[Item] = []
-
- class Config:
- orm_mode = True
注意,SQLAlchemy使用=
赋值,Pydantic使用:
赋值。
增删改查:
- from sqlalchemy.orm import Session
-
- from . import models, schemas
-
-
- def get_user(db: Session, user_id: int):
- return db.query(models.User).filter(models.User.id == user_id).first()
-
-
- def get_user_by_email(db: Session, email: str):
- return db.query(models.User).filter(models.User.email == email).first()
-
-
- def get_users(db: Session, skip: int = 0, limit: int = 100):
- return db.query(models.User).offset(skip).limit(limit).all()
-
-
- def create_user(db: Session, user: schemas.UserCreate):
- fake_hashed_password = user.password + "notreallyhashed"
- db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
- db.add(db_user)
- db.commit()
- db.refresh(db_user)
- return db_user
-
-
- def get_items(db: Session, skip: int = 0, limit: int = 100):
- return db.query(models.Item).offset(skip).limit(limit).all()
-
-
- def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
- db_item = models.Item(**item.dict(), owner_id=user_id)
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
主程序:
- from typing import List
-
- from fastapi import Depends, FastAPI, HTTPException
- from sqlalchemy.orm import Session
-
- from . import crud, models, schemas
- from .database import SessionLocal, engine
-
- models.Base.metadata.create_all(bind=engine)
-
- app = FastAPI()
-
-
- # Dependency
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
-
-
- @app.post("/users/", response_model=schemas.User)
- def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
- db_user = crud.get_user_by_email(db, email=user.email)
- if db_user:
- raise HTTPException(status_code=400, detail="Email already registered")
- return crud.create_user(db=db, user=user)
-
-
- @app.get("/users/", response_model=List[schemas.User])
- def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
- users = crud.get_users(db, skip=skip, limit=limit)
- return users
-
-
- @app.get("/users/{user_id}", response_model=schemas.User)
- def read_user(user_id: int, db: Session = Depends(get_db)):
- db_user = crud.get_user(db, user_id=user_id)
- if db_user is None:
- raise HTTPException(status_code=404, detail="User not found")
- return db_user
-
-
- @app.post("/users/{user_id}/items/", response_model=schemas.Item)
- def create_item_for_user(
- user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
- ):
- return crud.create_user_item(db=db, item=item, user_id=user_id)
-
-
- @app.get("/items/", response_model=List[schemas.Item])
- def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
- items = crud.get_items(db, skip=skip, limit=limit)
- return items
大应用-多文件
示例目录结构:
- .
- ├── app # "app" is a Python package
- │ ├── __init__.py # this file makes "app" a "Python package"
- │ ├── main.py # "main" module, e.g. import app.main
- │ ├── dependencies.py # "dependencies" module, e.g. import app.dependencies
- │ └── routers # "routers" is a "Python subpackage"
- │ │ ├── __init__.py # makes "routers" a "Python subpackage"
- │ │ ├── items.py # "items" submodule, e.g. import app.routers.items
- │ │ └── users.py # "users" submodule, e.g. import app.routers.users
- │ └── internal # "internal" is a "Python subpackage"
- │ ├── __init__.py # makes "internal" a "Python subpackage"
- │ └── admin.py # "admin" submodule, e.g. import app.internal.admin
APIRouter用于定义子模块的路由:
- from fastapi import APIRouter
-
- router = APIRouter()
-
-
- @router.get("/users/", tags=["users"])
- async def read_users():
- return [{"username": "Rick"}, {"username": "Morty"}]
-
-
- @router.get("/users/me", tags=["users"])
- async def read_user_me():
- return {"username": "fakecurrentuser"}
-
-
- @router.get("/users/{username}", tags=["users"])
- async def read_user(username: str):
- return {"username": username}
- from fastapi import APIRouter, Depends, HTTPException
-
- from ..dependencies import get_token_header
-
- router = APIRouter(
- prefix="/items",
- tags=["items"],
- dependencies=[Depends(get_token_header)],
- responses={404: {"description": "Not found"}},
- )
-
-
- fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}
-
-
- @router.get("/")
- async def read_items():
- return fake_items_db
-
-
- @router.get("/{item_id}")
- async def read_item(item_id: str):
- if item_id not in fake_items_db:
- raise HTTPException(status_code=404, detail="Item not found")
- return {"name": fake_items_db[item_id]["name"], "item_id": item_id}
-
-
- @router.put(
- "/{item_id}",
- tags=["custom"],
- responses={403: {"description": "Operation forbidden"}},
- )
- async def update_item(item_id: str):
- if item_id != "plumbus":
- raise HTTPException(
- status_code=403, detail="You can only update the item: plumbus"
- )
- return {"item_id": item_id, "name": "The great Plumbus"}
在主程序中引入子模块路由:
- from fastapi import Depends, FastAPI
-
- from .dependencies import get_query_token, get_token_header
- from .internal import admin
- from .routers import items, users
-
- app = FastAPI(dependencies=[Depends(get_query_token)])
-
-
- app.include_router(users.router)
- app.include_router(items.router)
- app.include_router(
- admin.router,
- prefix="/admin",
- tags=["admin"],
- dependencies=[Depends(get_token_header)],
- responses={418: {"description": "I'm a teapot"}},
- )
-
-
- @app.get("/")
- async def root():
- return {"message": "Hello Bigger Applications!"}
后台任务
使用BackgroundTasks定义后台任务:
- from fastapi import BackgroundTasks, FastAPI
-
- app = FastAPI()
-
-
- def write_notification(email: str, message=""):
- with open("log.txt", mode="w") as email_file:
- content = f"notification for {email}: {message}"
- email_file.write(content)
-
-
- @app.post("/send-notification/{email}")
- async def send_notification(email: str, background_tasks: BackgroundTasks):
- background_tasks.add_task(write_notification, email, message="some notification")
- return {"message": "Notification sent in the background"}
BackgroundTasks也能支持依赖注入:
- from typing import Union
-
- from fastapi import BackgroundTasks, Depends, FastAPI
-
- app = FastAPI()
-
-
- def write_log(message: str):
- with open("log.txt", mode="a") as log:
- log.write(message)
-
-
- def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
- if q:
- message = f"found query: {q}\n"
- background_tasks.add_task(write_log, message)
- return q
-
-
- @app.post("/send-notification/{email}")
- async def send_notification(
- email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
- ):
- message = f"message to {email}\n"
- background_tasks.add_task(write_log, message)
- return {"message": "Message sent"}
元数据和文档URL
设置应用元数据:
- from fastapi import FastAPI
-
- description = """
- ChimichangApp API helps you do awesome stuff.
- ## Items
- You can **read items**.
- ## Users
- You will be able to:
- * **Create users** (_not implemented_).
- * **Read users** (_not implemented_).
- """
-
- app = FastAPI(
- title="ChimichangApp",
- description=description,
- version="0.0.1",
- terms_of_service="http://example.com/terms/",
- contact={
- "name": "Deadpoolio the Amazing",
- "url": "http://x-force.example.com/contact/",
- "email": "dp@x-force.example.com",
- },
- license_info={
- "name": "Apache 2.0",
- "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
- },
- )
-
-
- @app.get("/items/")
- async def read_items():
- return [{"name": "Katana"}]
效果:
设置tag元数据:
- from fastapi import FastAPI
-
- tags_metadata = [
- {
- "name": "users",
- "description": "Operations with users. The **login** logic is also here.",
- },
- {
- "name": "items",
- "description": "Manage items. So _fancy_ they have their own docs.",
- "externalDocs": {
- "description": "Items external docs",
- "url": "https://fastapi.tiangolo.com/",
- },
- },
- ]
-
- app = FastAPI(openapi_tags=tags_metadata)
-
-
- @app.get("/users/", tags=["users"])
- async def get_users():
- return [{"name": "Harry"}, {"name": "Ron"}]
-
-
- @app.get("/items/", tags=["items"])
- async def get_items():
- return [{"name": "wand"}, {"name": "flying broom"}]
添加tag:
- from fastapi import FastAPI
-
- tags_metadata = [
- {
- "name": "users",
- "description": "Operations with users. The **login** logic is also here.",
- },
- {
- "name": "items",
- "description": "Manage items. So _fancy_ they have their own docs.",
- "externalDocs": {
- "description": "Items external docs",
- "url": "https://fastapi.tiangolo.com/",
- },
- },
- ]
-
- app = FastAPI(openapi_tags=tags_metadata)
-
-
- @app.get("/users/", tags=["users"])
- async def get_users():
- return [{"name": "Harry"}, {"name": "Ron"}]
-
-
- @app.get("/items/", tags=["items"])
- async def get_items():
- return [{"name": "wand"}, {"name": "flying broom"}]
效果:
OpenAPI的URL默认是/openapi.json
,设置/api/v1/openapi.json
:
- from fastapi import FastAPI
-
- app = FastAPI(openapi_url="/api/v1/openapi.json")
-
-
- @app.get("/items/")
- async def read_items():
- return [{"name": "Foo"}]
文档的URL默认是/docs
,设置为/documentation
:
- from fastapi import FastAPI
-
- app = FastAPI(docs_url="/documentation", redoc_url=None)
-
-
- @app.get("/items/")
- async def read_items():
- return [{"name": "Foo"}]
静态文件
使用StaticFiles
:
- from fastapi import FastAPI
- from fastapi.staticfiles import StaticFiles
-
- app = FastAPI()
-
- app.mount("/static", StaticFiles(directory="static"), name="static")
FastAPI会自动挂载静态文件。
单元测试
使用pytest和TestClient:
- from fastapi import FastAPI
- from fastapi.testclient import TestClient
-
- app = FastAPI()
-
-
- @app.get("/")
- async def read_main():
- return {"msg": "Hello World"}
-
-
- client = TestClient(app)
-
-
- def test_read_main():
- response = client.get("/")
- assert response.status_code == 200
- assert response.json() == {"msg": "Hello World"}
单元测试文件拆出来:
- .
- ├── app
- │ ├── __init__.py
- │ ├── main.py
- │ └── test_main.py
- from fastapi import FastAPI
-
- app = FastAPI()
-
-
- @app.get("/")
- async def read_main():
- return {"msg": "Hello World"}
- from fastapi.testclient import TestClient
-
- from .main import app
-
- client = TestClient(app)
-
-
- def test_read_main():
- response = client.get("/")
- assert response.status_code == 200
- assert response.json() == {"msg": "Hello World"}
调试
通常借助PyCharm打断点调试:
参考资料: