当前位置:   article > 正文

FastAPI官方教程太棒了(下)

@app.post("/items/", status_code=201) postman

响应状态码

@app.post()方法中添加status_code参数:

  1. from fastapi import FastAPI
  2. app = FastAPI()
  3. @app.post("/items/", status_code=201)
  4. async def create_item(name: str):
  5. return {"name": name}

status_code也可以是IntEnum,比如Python的http.HTTPStatus

常见响应状态码:

  • 100以上,信息;很少直接使用;

  • 200以上,成功;200是OK,201是Created,204是No Content;

  • 300以上,重定向;304是Not Modified;

  • 400以上,客户端错误;404是Not Found;

  • 500以上,服务器错误;

FastAPI引入了status,可以方便的录入这些状态:

  1. from fastapi import FastAPI, status
  2. app = FastAPI()
  3. @app.post("/items/", status_code=status.HTTP_201_CREATED)
  4. async def create_item(name: str):
  5. return {"name": name}

表单数据

为了使用表单,首先需要安装python-multipart:

pip install python-multipart

示例:

  1. from fastapi import FastAPI, Form
  2. app = FastAPI()
  3. @app.post("/login/")
  4. async def login(username: str = Form(), password: str = Form()):
  5. return {"username": username}

表单由HTML中的<form></form>发送,请求头的content-type一般是application/x-www-form-urlencoded,当为文件时multipart/form-data

请求文件

示例:

  1. from fastapi import FastAPI, File, UploadFile
  2. app = FastAPI()
  3. @app.post("/files/")
  4. async def create_file(file: bytes = File()):
  5. return {"file_size": len(file)}
  6. @app.post("/uploadfile/")
  7. async def create_upload_file(file: UploadFile):
  8. return {"filename": file.filename}

create_file()的类型为bytes,接收到的文件内容也是bytes,数据都存在于内存中,适用于小文件。create_upload_file()的类型为UploadFile,它会在内存设置一个最大存储,超出最大存储,就会把数据转存到磁盘,适用于大文件。

UploadFile有以下属性:

  • filename,文件名,比如myimage.jpg;

  • content_type,文件类型,比如image/jpeg;

  • file,SpooledTemporaryFile实例,一个file-like对象。

UploadFile有以下方法:

  • write(data):写数据(str或bytes)到文件;

  • read(size):从文件读size(int)大小的bytes或character;

  • seek(offset):定位到文件中offset(int)的位置,比如await myfile.seek(0)会定位到文件开始;

  • close():关闭文件;

所有这些方法都是async的,需要await:

contents = await myfile.read()

不想await就使用其中的file对象:

contents = myfile.file.read()

文件可选非必传:

  1. from typing import Union
  2. from fastapi import FastAPI, File, UploadFile
  3. app = FastAPI()
  4. @app.post("/files/")
  5. async def create_file(file: Union[bytes, None] = File(default=None)):
  6. if not file:
  7. return {"message": "No file sent"}
  8. else:
  9. return {"file_size": len(file)}
  10. @app.post("/uploadfile/")
  11. async def create_upload_file(file: Union[UploadFile, None] = None):
  12. if not file:
  13. return {"message": "No upload file sent"}
  14. else:
  15. return {"filename": file.filename}

UploadFile的metadata:

  1. from fastapi import FastAPI, File, UploadFile
  2. app = FastAPI()
  3. @app.post("/files/")
  4. async def create_file(file: bytes = File(description="A file read as bytes")):
  5. return {"file_size": len(file)}
  6. @app.post("/uploadfile/")
  7. async def create_upload_file(
  8. file: UploadFile = File(description="A file read as UploadFile"),
  9. ):
  10. return {"filename": file.filename}

多文件上传:

  1. from typing import List
  2. from fastapi import FastAPI, File, UploadFile
  3. from fastapi.responses import HTMLResponse
  4. app = FastAPI()
  5. @app.post("/files/")
  6. async def create_files(files: List[bytes] = File()):
  7. return {"file_sizes": [len(file) for file in files]}
  8. @app.post("/uploadfiles/")
  9. async def create_upload_files(files: List[UploadFile]):
  10. return {"filenames": [file.filename for file in files]}
  11. @app.get("/")
  12. async def main():
  13. content = """
  14. <body>
  15. <form action="/files/" enctype="multipart/form-data" method="post">
  16. <input name="files" type="file" multiple>
  17. <input type="submit">
  18. </form>
  19. <form action="/uploadfiles/" enctype="multipart/form-data" method="post">
  20. <input name="files" type="file" multiple>
  21. <input type="submit">
  22. </form>
  23. </body>
  24. """
  25. return HTMLResponse(content=content)

同时请求表单和文件

示例:

  1. from fastapi import FastAPI, File, Form, UploadFile
  2. app = FastAPI()
  3. @app.post("/files/")
  4. async def create_file(
  5. file: bytes = File(), fileb: UploadFile = File(), token: str = Form()
  6. ):
  7. return {
  8. "file_size": len(file),
  9. "token": token,
  10. "fileb_content_type": fileb.content_type,
  11. }

错误处理

FastAPI提供了HTTPException:

  1. from fastapi import FastAPI, HTTPException
  2. app = FastAPI()
  3. items = {"foo": "The Foo Wrestlers"}
  4. @app.get("/items/{item_id}")
  5. async def read_item(item_id: str):
  6. if item_id not in items:
  7. raise HTTPException(status_code=404, detail="Item not found")
  8. return {"item": items[item_id]}

HTTPException不是return而是raise的,抛出异常。

对于抛出的异常,可以使用@app.exception_handler自定义handler进行处理:

  1. from fastapi import FastAPI, Request
  2. from fastapi.responses import JSONResponse
  3. class UnicornException(Exception):
  4. def __init__(self, name: str):
  5. self.name = name
  6. app = FastAPI()
  7. @app.exception_handler(UnicornException)
  8. async def unicorn_exception_handler(request: Request, exc: UnicornException):
  9. return JSONResponse(
  10. status_code=418,
  11. content={"message": f"Oops! {exc.name} did something. There goes a rainbow..."},
  12. )
  13. @app.get("/unicorns/{name}")
  14. async def read_unicorn(name: str):
  15. if name == "yolo":
  16. raise UnicornException(name=name)
  17. return {"unicorn_name": name}

在抛出HTTPException异常时,FastAPI有很多默认的handler,比如RequestValidationError,可以使用此方法重写默认的handler:

  1. from fastapi import FastAPI, HTTPException
  2. from fastapi.exceptions import RequestValidationError
  3. from fastapi.responses import PlainTextResponse
  4. from starlette.exceptions import HTTPException as StarletteHTTPException
  5. app = FastAPI()
  6. @app.exception_handler(StarletteHTTPException)
  7. async def http_exception_handler(request, exc):
  8. return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
  9. @app.exception_handler(RequestValidationError)
  10. async def validation_exception_handler(request, exc):
  11. return PlainTextResponse(str(exc), status_code=400)
  12. @app.get("/items/{item_id}")
  13. async def read_item(item_id: int):
  14. if item_id == 3:
  15. raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
  16. return {"item_id": item_id}

默认handler会返回:

  1. {
  2. "detail": [
  3. {
  4. "loc": [
  5. "path",
  6. "item_id"
  7. ],
  8. "msg": "value is not a valid integer",
  9. "type": "type_error.integer"
  10. }
  11. ]
  12. }

而重写handler后会返回字符串:

  1. 1 validation error
  2. path -> item_id
  3. value is not a valid integer (type=type_error.integer)

如果不想改动默认handler,只是补充点信息,可以导入http_exception_handler和request_validation_exception_handler:

  1. from fastapi import FastAPI, HTTPException
  2. from fastapi.exception_handlers import (
  3. http_exception_handler,
  4. request_validation_exception_handler,
  5. )
  6. from fastapi.exceptions import RequestValidationError
  7. from starlette.exceptions import HTTPException as StarletteHTTPException
  8. app = FastAPI()
  9. @app.exception_handler(StarletteHTTPException)
  10. async def custom_http_exception_handler(request, exc):
  11. print(f"OMG! An HTTP error!: {repr(exc)}")
  12. return await http_exception_handler(request, exc)
  13. @app.exception_handler(RequestValidationError)
  14. async def validation_exception_handler(request, exc):
  15. print(f"OMG! The client sent invalid data!: {exc}")
  16. return await request_validation_exception_handler(request, exc)
  17. @app.get("/items/{item_id}")
  18. async def read_item(item_id: int):
  19. if item_id == 3:
  20. raise HTTPException(status_code=418, detail="Nope! I don't like 3.")
  21. return {"item_id": item_id}

路径操作配置

响应状态码:

  1. from typing import Set, Union
  2. from fastapi import FastAPI, status
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. class Item(BaseModel):
  6. name: str
  7. description: Union[str, None] = None
  8. price: float
  9. tax: Union[float, None] = None
  10. tags: Set[str] = set()
  11. @app.post("/items/", response_model=Item, status_code=status.HTTP_201_CREATED)
  12. async def create_item(item: Item):
  13. return item

标签:

  1. from typing import Set, Union
  2. from fastapi import FastAPI
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. class Item(BaseModel):
  6. name: str
  7. description: Union[str, None] = None
  8. price: float
  9. tax: Union[float, None] = None
  10. tags: Set[str] = set()
  11. @app.post("/items/", response_model=Item, tags=["items"])
  12. async def create_item(item: Item):
  13. return item
  14. @app.get("/items/", tags=["items"])
  15. async def read_items():
  16. return [{"name": "Foo", "price": 42}]
  17. @app.get("/users/", tags=["users"])
  18. async def read_users():
  19. return [{"username": "johndoe"}]

标签枚举:

  1. from enum import Enum
  2. from fastapi import FastAPI
  3. app = FastAPI()
  4. class Tags(Enum):
  5. items = "items"
  6. users = "users"
  7. @app.get("/items/", tags=[Tags.items])
  8. async def get_items():
  9. return ["Portal gun", "Plumbus"]
  10. @app.get("/users/", tags=[Tags.users])
  11. async def read_users():
  12. return ["Rick", "Morty"]

概要和描述:

  1. from typing import Set, Union
  2. from fastapi import FastAPI
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. class Item(BaseModel):
  6. name: str
  7. description: Union[str, None] = None
  8. price: float
  9. tax: Union[float, None] = None
  10. tags: Set[str] = set()
  11. @app.post(
  12. "/items/",
  13. response_model=Item,
  14. summary="Create an item",
  15. description="Create an item with all the information, name, description, price, tax and a set of unique tags",
  16. )
  17. async def create_item(item: Item):
  18. return item

文档字符串:

  1. from typing import Set, Union
  2. from fastapi import FastAPI
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. class Item(BaseModel):
  6. name: str
  7. description: Union[str, None] = None
  8. price: float
  9. tax: Union[float, None] = None
  10. tags: Set[str] = set()
  11. @app.post("/items/", response_model=Item, summary="Create an item")
  12. async def create_item(item: Item):
  13. """
  14. Create an item with all the information:
  15. - **name**: each item must have a name
  16. - **description**: a long description
  17. - **price**: required
  18. - **tax**: if the item doesn't have tax, you can omit this
  19. - **tags**: a set of unique tag strings for this item
  20. """
  21. return item

响应描述:

  1. from typing import Set, Union
  2. from fastapi import FastAPI
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. class Item(BaseModel):
  6. name: str
  7. description: Union[str, None] = None
  8. price: float
  9. tax: Union[float, None] = None
  10. tags: Set[str] = set()
  11. @app.post(
  12. "/items/",
  13. response_model=Item,
  14. summary="Create an item",
  15. response_description="The created item",
  16. )
  17. async def create_item(item: Item):
  18. """
  19. Create an item with all the information:
  20. - **name**: each item must have a name
  21. - **description**: a long description
  22. - **price**: required
  23. - **tax**: if the item doesn't have tax, you can omit this
  24. - **tags**: a set of unique tag strings for this item
  25. """
  26. return item

标记为deprecated:

  1. from fastapi import FastAPI
  2. app = FastAPI()
  3. @app.get("/items/", tags=["items"])
  4. async def read_items():
  5. return [{"name": "Foo", "price": 42}]
  6. @app.get("/users/", tags=["users"])
  7. async def read_users():
  8. return [{"username": "johndoe"}]
  9. @app.get("/elements/", tags=["items"], deprecated=True)
  10. async def read_elements():
  11. return [{"item_id": "Foo"}]

JSON兼容编码器

jsonable_encoder()函数的作用是把Pydantic model转换成JSON兼容的类型比如dict、list等。

  1. from datetime import datetime
  2. from typing import Union
  3. from fastapi import FastAPI
  4. from fastapi.encoders import jsonable_encoder
  5. from pydantic import BaseModel
  6. fake_db = {}
  7. class Item(BaseModel):
  8. title: str
  9. timestamp: datetime
  10. description: Union[str, None] = None
  11. app = FastAPI()
  12. @app.put("/items/{id}")
  13. def update_item(id: str, item: Item):
  14. json_compatible_item_data = jsonable_encoder(item)
  15. fake_db[id] = json_compatible_item_data

Body-更新

使用PUT:

  1. from typing import List, Union
  2. from fastapi import FastAPI
  3. from fastapi.encoders import jsonable_encoder
  4. from pydantic import BaseModel
  5. app = FastAPI()
  6. class Item(BaseModel):
  7. name: Union[str, None] = None
  8. description: Union[str, None] = None
  9. price: Union[float, None] = None
  10. tax: float = 10.5
  11. tags: List[str] = []
  12. items = {
  13. "foo": {"name": "Foo", "price": 50.2},
  14. "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
  15. "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
  16. }
  17. @app.get("/items/{item_id}", response_model=Item)
  18. async def read_item(item_id: str):
  19. return items[item_id]
  20. @app.put("/items/{item_id}", response_model=Item)
  21. async def update_item(item_id: str, item: Item):
  22. update_item_encoded = jsonable_encoder(item)
  23. items[item_id] = update_item_encoded
  24. return update_item_encoded

输入数据使用了jsonable_encoder()函数转换为JSON兼容类型。

使用PATCH:

exclude_unset=True

  1. from typing import List, Union
  2. from fastapi import FastAPI
  3. from fastapi.encoders import jsonable_encoder
  4. from pydantic import BaseModel
  5. app = FastAPI()
  6. class Item(BaseModel):
  7. name: Union[str, None] = None
  8. description: Union[str, None] = None
  9. price: Union[float, None] = None
  10. tax: float = 10.5
  11. tags: List[str] = []
  12. items = {
  13. "foo": {"name": "Foo", "price": 50.2},
  14. "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
  15. "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
  16. }
  17. @app.get("/items/{item_id}", response_model=Item)
  18. async def read_item(item_id: str):
  19. return items[item_id]
  20. @app.patch("/items/{item_id}", response_model=Item)
  21. async def update_item(item_id: str, item: Item):
  22. stored_item_data = items[item_id]
  23. stored_item_model = Item(**stored_item_data)
  24. update_data = item.dict(exclude_unset=True)
  25. updated_item = stored_item_model.copy(update=update_data)
  26. items[item_id] = jsonable_encoder(updated_item)
  27. return updated_item

.copy(update=update_data)

  1. from typing import List, Union
  2. from fastapi import FastAPI
  3. from fastapi.encoders import jsonable_encoder
  4. from pydantic import BaseModel
  5. app = FastAPI()
  6. class Item(BaseModel):
  7. name: Union[str, None] = None
  8. description: Union[str, None] = None
  9. price: Union[float, None] = None
  10. tax: float = 10.5
  11. tags: List[str] = []
  12. items = {
  13. "foo": {"name": "Foo", "price": 50.2},
  14. "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
  15. "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
  16. }
  17. @app.get("/items/{item_id}", response_model=Item)
  18. async def read_item(item_id: str):
  19. return items[item_id]
  20. @app.patch("/items/{item_id}", response_model=Item)
  21. async def update_item(item_id: str, item: Item):
  22. stored_item_data = items[item_id]
  23. stored_item_model = Item(**stored_item_data)
  24. update_data = item.dict(exclude_unset=True)
  25. updated_item = stored_item_model.copy(update=update_data)
  26. items[item_id] = jsonable_encoder(updated_item)
  27. return updated_item

PUT和PATCH都可以用来部分更新,PUT用的更多。

依赖

什么是依赖注入?在FastAPI里面,你可以在路径操作函数中添加依赖的声明,然后FastAPI会自动加载这些依赖。

依赖注入的好处有:

  • 复用代码;
  • 复用数据库连接;
  • 增强安全、认证、角色;
  • 等等等;

依赖注入示例:

  1. from typing import Union
  2. from fastapi import Depends, FastAPI
  3. app = FastAPI()
  4. async def common_parameters(
  5. q: Union[str, None] = None, skip: int = 0, limit: int = 100
  6. ):
  7. return {"q": q, "skip": skip, "limit": limit}
  8. @app.get("/items/")
  9. async def read_items(commons: dict = Depends(common_parameters)):
  10. return commons
  11. @app.get("/users/")
  12. async def read_users(commons: dict = Depends(common_parameters)):
  13. return commons
  • common_parameters()函数是个简单的依赖;
  • Depends引入依赖;

FastAPI就会自动调用common_parameters()函数并把结果返回给commons,而无需任何其他代码。

依赖也可以使用class,把common_parameters()函数改为CommonQueryParams类:

  1. from typing import Union
  2. from fastapi import Depends, FastAPI
  3. app = FastAPI()
  4. fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
  5. class CommonQueryParams:
  6. def __init__(self, q: Union[str, None] = None, skip: int = 0, limit: int = 100):
  7. self.q = q
  8. self.skip = skip
  9. self.limit = limit
  10. @app.get("/items/")
  11. async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):
  12. response = {}
  13. if commons.q:
  14. response.update({"q": commons.q})
  15. items = fake_items_db[commons.skip : commons.skip + commons.limit]
  16. response.update({"items": items})
  17. return response

Depends会创建一个CommonQueryParams的实例然后赋值给commons。

更一步简化,只写1次CommonQueryParams:

commons: CommonQueryParams = Depends()

Depends()里面的CommonQueryParams可以省略掉。

FastAPI支持子依赖,也就是Depends嵌套:

  1. from typing import Union
  2. from fastapi import Cookie, Depends, FastAPI
  3. app = FastAPI()
  4. def query_extractor(q: Union[str, None] = None):
  5. return q
  6. def query_or_cookie_extractor(
  7. q: str = Depends(query_extractor),
  8. last_query: Union[str, None] = Cookie(default=None),
  9. ):
  10. if not q:
  11. return last_query
  12. return q
  13. @app.get("/items/")
  14. async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)):
  15. return {"q_or_cookie": query_or_default}

如果使用同一个依赖多次,FastAPI默认会只注入一次。可以按以下设置让FastAPI注入多次:

  1. async def needy_dependency(fresh_value: str = Depends(get_value, use_cache=False)):
  2. return {"fresh_value": fresh_value}

多个依赖可以用dependencies的list:

  1. from fastapi import Depends, FastAPI, Header, HTTPException
  2. app = FastAPI()
  3. async def verify_token(x_token: str = Header()):
  4. if x_token != "fake-super-secret-token":
  5. raise HTTPException(status_code=400, detail="X-Token header invalid")
  6. async def verify_key(x_key: str = Header()):
  7. if x_key != "fake-super-secret-key":
  8. raise HTTPException(status_code=400, detail="X-Key header invalid")
  9. return x_key
  10. @app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])
  11. async def read_items():
  12. return [{"item": "Foo"}, {"item": "Bar"}]

如果给FastAPI的构造函数传入dependencies,那么就是全局依赖:

  1. from fastapi import Depends, FastAPI, Header, HTTPException
  2. async def verify_token(x_token: str = Header()):
  3. if x_token != "fake-super-secret-token":
  4. raise HTTPException(status_code=400, detail="X-Token header invalid")
  5. async def verify_key(x_key: str = Header()):
  6. if x_key != "fake-super-secret-key":
  7. raise HTTPException(status_code=400, detail="X-Key header invalid")
  8. return x_key
  9. app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])
  10. @app.get("/items/")
  11. async def read_items():
  12. return [{"item": "Portal Gun"}, {"item": "Plumbus"}]
  13. @app.get("/users/")
  14. async def read_users():
  15. return [{"username": "Rick"}, {"username": "Morty"}]

如果在依赖函数中使用yield,它后面的代码就相当于teardown,这点用法跟pytest的fixture类似:

  1. async def get_db():
  2. db = DBSession()
  3. try:
  4. yield db
  5. finally:
  6. db.close()

另外,借助yield和with可以创建一个上下文管理器(实现__enter____exit__):

  1. class MySuperContextManager:
  2. def __init__(self):
  3. self.db = DBSession()
  4. def __enter__(self):
  5. return self.db
  6. def __exit__(self, exc_type, exc_value, traceback):
  7. self.db.close()
  8. async def get_db():
  9. with MySuperContextManager() as db:
  10. yield db

安全

FastAPI支持OAuth2协议:

  1. from fastapi import Depends, FastAPI
  2. from fastapi.security import OAuth2PasswordBearer
  3. app = FastAPI()
  4. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  5. @app.get("/items/")
  6. async def read_items(token: str = Depends(oauth2_scheme)):
  7. return {"token": token}

备注:需要提前安装 pip install python-multipart,因为OAuth2使用表单来发送username和password。虽然这个接口已经加上鉴权了。但这些入参都没有生效,因为我们还没有添加相应的处理代码。为了让鉴权实际生效,我们继续添加代码:

  1. from typing import Union
  2. from fastapi import Depends, FastAPI
  3. from fastapi.security import OAuth2PasswordBearer
  4. from pydantic import BaseModel
  5. app = FastAPI()
  6. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  7. class User(BaseModel):
  8. username: str
  9. email: Union[str, None] = None
  10. full_name: Union[str, None] = None
  11. disabled: Union[bool, None] = None
  12. def fake_decode_token(token):
  13. return User(
  14. username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
  15. )
  16. async def get_current_user(token: str = Depends(oauth2_scheme)):
  17. user = fake_decode_token(token)
  18. return user
  19. @app.get("/users/me")
  20. async def read_users_me(current_user: User = Depends(get_current_user)):
  21. return current_user
  1. 定义模型User;
  2. 创建依赖get_current_user;
  3. fake_decode_token接收token,返回模拟的假用户;
  4. read_users_me注入依赖;

然后实现username和password:

  1. from typing import Union
  2. from fastapi import Depends, FastAPI, HTTPException, status
  3. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  4. from pydantic import BaseModel
  5. fake_users_db = {
  6. "johndoe": {
  7. "username": "johndoe",
  8. "full_name": "John Doe",
  9. "email": "johndoe@example.com",
  10. "hashed_password": "fakehashedsecret",
  11. "disabled": False,
  12. },
  13. "alice": {
  14. "username": "alice",
  15. "full_name": "Alice Wonderson",
  16. "email": "alice@example.com",
  17. "hashed_password": "fakehashedsecret2",
  18. "disabled": True,
  19. },
  20. }
  21. app = FastAPI()
  22. def fake_hash_password(password: str):
  23. return "fakehashed" + password
  24. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  25. class User(BaseModel):
  26. username: str
  27. email: Union[str, None] = None
  28. full_name: Union[str, None] = None
  29. disabled: Union[bool, None] = None
  30. class UserInDB(User):
  31. hashed_password: str
  32. def get_user(db, username: str):
  33. if username in db:
  34. user_dict = db[username]
  35. return UserInDB(**user_dict)
  36. def fake_decode_token(token):
  37. # This doesn't provide any security at all
  38. # Check the next version
  39. user = get_user(fake_users_db, token)
  40. return user
  41. async def get_current_user(token: str = Depends(oauth2_scheme)):
  42. user = fake_decode_token(token)
  43. if not user:
  44. raise HTTPException(
  45. status_code=status.HTTP_401_UNAUTHORIZED,
  46. detail="Invalid authentication credentials",
  47. headers={"WWW-Authenticate": "Bearer"},
  48. )
  49. return user
  50. async def get_current_active_user(current_user: User = Depends(get_current_user)):
  51. if current_user.disabled:
  52. raise HTTPException(status_code=400, detail="Inactive user")
  53. return current_user
  54. @app.post("/token")
  55. async def login(form_data: OAuth2PasswordRequestForm = Depends()):
  56. user_dict = fake_users_db.get(form_data.username)
  57. if not user_dict:
  58. raise HTTPException(status_code=400, detail="Incorrect username or password")
  59. user = UserInDB(**user_dict)
  60. hashed_password = fake_hash_password(form_data.password)
  61. if not hashed_password == user.hashed_password:
  62. raise HTTPException(status_code=400, detail="Incorrect username or password")
  63. return {"access_token": user.username, "token_type": "bearer"}
  64. @app.get("/users/me")
  65. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  66. return current_user

现在就可以测试一下了http://127.0.0.1:8000/docs:

img

授权以后:

img

访问/users/me会返回:

  1. {
  2. "username": "johndoe",
  3. "email": "johndoe@example.com",
  4. "full_name": "John Doe",
  5. "disabled": false,
  6. "hashed_password": "fakehashedsecret"
  7. }

如果logout再访问会出现:

  1. {
  2. "detail": "Not authenticated"
  3. }

输入错误的用户会出现:

  1. {
  2. "detail": "Inactive user"
  3. }

如果想使用JWT,那么先安装python-jose。为了给密码加密,需要安装passlib

示例代码:

  1. from datetime import datetime, timedelta
  2. from typing import Union
  3. from fastapi import Depends, FastAPI, HTTPException, status
  4. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  5. from jose import JWTError, jwt
  6. from passlib.context import CryptContext
  7. from pydantic import BaseModel
  8. # to get a string like this run:
  9. # openssl rand -hex 32
  10. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
  11. ALGORITHM = "HS256"
  12. ACCESS_TOKEN_EXPIRE_MINUTES = 30
  13. fake_users_db = {
  14. "johndoe": {
  15. "username": "johndoe",
  16. "full_name": "John Doe",
  17. "email": "johndoe@example.com",
  18. "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
  19. "disabled": False,
  20. }
  21. }
  22. class Token(BaseModel):
  23. access_token: str
  24. token_type: str
  25. class TokenData(BaseModel):
  26. username: Union[str, None] = None
  27. class User(BaseModel):
  28. username: str
  29. email: Union[str, None] = None
  30. full_name: Union[str, None] = None
  31. disabled: Union[bool, None] = None
  32. class UserInDB(User):
  33. hashed_password: str
  34. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  35. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  36. app = FastAPI()
  37. def verify_password(plain_password, hashed_password):
  38. return pwd_context.verify(plain_password, hashed_password)
  39. def get_password_hash(password):
  40. return pwd_context.hash(password)
  41. def get_user(db, username: str):
  42. if username in db:
  43. user_dict = db[username]
  44. return UserInDB(**user_dict)
  45. def authenticate_user(fake_db, username: str, password: str):
  46. user = get_user(fake_db, username)
  47. if not user:
  48. return False
  49. if not verify_password(password, user.hashed_password):
  50. return False
  51. return user
  52. def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
  53. to_encode = data.copy()
  54. if expires_delta:
  55. expire = datetime.utcnow() + expires_delta
  56. else:
  57. expire = datetime.utcnow() + timedelta(minutes=15)
  58. to_encode.update({"exp": expire})
  59. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  60. return encoded_jwt
  61. async def get_current_user(token: str = Depends(oauth2_scheme)):
  62. credentials_exception = HTTPException(
  63. status_code=status.HTTP_401_UNAUTHORIZED,
  64. detail="Could not validate credentials",
  65. headers={"WWW-Authenticate": "Bearer"},
  66. )
  67. try:
  68. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  69. username: str = payload.get("sub")
  70. if username is None:
  71. raise credentials_exception
  72. token_data = TokenData(username=username)
  73. except JWTError:
  74. raise credentials_exception
  75. user = get_user(fake_users_db, username=token_data.username)
  76. if user is None:
  77. raise credentials_exception
  78. return user
  79. async def get_current_active_user(current_user: User = Depends(get_current_user)):
  80. if current_user.disabled:
  81. raise HTTPException(status_code=400, detail="Inactive user")
  82. return current_user
  83. @app.post("/token", response_model=Token)
  84. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  85. user = authenticate_user(fake_users_db, form_data.username, form_data.password)
  86. if not user:
  87. raise HTTPException(
  88. status_code=status.HTTP_401_UNAUTHORIZED,
  89. detail="Incorrect username or password",
  90. headers={"WWW-Authenticate": "Bearer"},
  91. )
  92. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  93. access_token = create_access_token(
  94. data={"sub": user.username}, expires_delta=access_token_expires
  95. )
  96. return {"access_token": access_token, "token_type": "bearer"}
  97. @app.get("/users/me/", response_model=User)
  98. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  99. return current_user
  100. @app.get("/users/me/items/")
  101. async def read_own_items(current_user: User = Depends(get_current_active_user)):
  102. return [{"item_id": "Foo", "owner": current_user.username}]

其中的SECRET_KEY通过openssl生成:

openssl rand -hex 32

中间件

FastAPI这里的中间件,指的是一个函数,它在请求处理前被调用,在响应返回前调用。有点类似于Spring的过滤器filter。

创建中间件:

  1. import time
  2. from fastapi import FastAPI, Request
  3. app = FastAPI()
  4. @app.middleware("http")
  5. async def add_process_time_header(request: Request, call_next):
  6. start_time = time.time()
  7. response = await call_next(request)
  8. process_time = time.time() - start_time
  9. response.headers["X-Process-Time"] = str(process_time)
  10. return response

CORS

Cross-Origin Resource Sharing,跨域访问。

同域包括协议、域名、端口,以下均是不同域:

  • http://localhost
  • https://localhost
  • http://localhost:8080

使用CORSMiddleware可以实现跨域访问:

  1. from fastapi import FastAPI
  2. from fastapi.middleware.cors import CORSMiddleware
  3. app = FastAPI()
  4. origins = [
  5. "http://localhost.tiangolo.com",
  6. "https://localhost.tiangolo.com",
  7. "http://localhost",
  8. "http://localhost:8080",
  9. ]
  10. app.add_middleware(
  11. CORSMiddleware,
  12. allow_origins=origins,
  13. allow_credentials=True,
  14. allow_methods=["*"],
  15. allow_headers=["*"],
  16. )
  17. @app.get("/")
  18. async def main():
  19. return {"message": "Hello World"}
  • allow_origins,允许域名,[*]代表所有;
  • allow_origin_regex,允许域名的正则匹配;
  • allow_methods,允许请求方法,[*]代表所有;
  • allow_headers,允许请求头,[*]代表所有;
  • allow_credentials,跨域访问时是否需要cookie,默认False,设置为True时allow_origins不能设置为[*]
  • expose_headers,暴露给浏览器的响应头,默认[]
  • max_age,浏览器最大缓存CORS 响应的时间,默认60s;

SQL关系型数据库

官方教程使用的是SQLAlchemy。

示例:

  1. .
  2. └── sql_app
  3. ├── __init__.py
  4. ├── crud.py
  5. ├── database.py
  6. ├── main.py
  7. ├── models.py
  8. └── schemas.py

安装:

pip install sqlalchemy

创建数据库:

  1. from sqlalchemy import create_engine
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
  5. # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. engine = create_engine(
  7. SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  8. )
  9. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  10. Base = declarative_base()

创建数据库模型:

  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
  2. from sqlalchemy.orm import relationship
  3. from .database import Base
  4. class User(Base):
  5. __tablename__ = "users"
  6. id = Column(Integer, primary_key=True, index=True)
  7. email = Column(String, unique=True, index=True)
  8. hashed_password = Column(String)
  9. is_active = Column(Boolean, default=True)
  10. items = relationship("Item", back_populates="owner")
  11. class Item(Base):
  12. __tablename__ = "items"
  13. id = Column(Integer, primary_key=True, index=True)
  14. title = Column(String, index=True)
  15. description = Column(String, index=True)
  16. owner_id = Column(Integer, ForeignKey("users.id"))
  17. owner = relationship("User", back_populates="items")

创建Pydantic模型:

  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. class ItemBase(BaseModel):
  4. title: str
  5. description: Union[str, None] = None
  6. class ItemCreate(ItemBase):
  7. pass
  8. class Item(ItemBase):
  9. id: int
  10. owner_id: int
  11. class Config:
  12. orm_mode = True
  13. class UserBase(BaseModel):
  14. email: str
  15. class UserCreate(UserBase):
  16. password: str
  17. class User(UserBase):
  18. id: int
  19. is_active: bool
  20. items: List[Item] = []
  21. class Config:
  22. orm_mode = True

注意,SQLAlchemy使用=赋值,Pydantic使用:赋值。

增删改查:

  1. from sqlalchemy.orm import Session
  2. from . import models, schemas
  3. def get_user(db: Session, user_id: int):
  4. return db.query(models.User).filter(models.User.id == user_id).first()
  5. def get_user_by_email(db: Session, email: str):
  6. return db.query(models.User).filter(models.User.email == email).first()
  7. def get_users(db: Session, skip: int = 0, limit: int = 100):
  8. return db.query(models.User).offset(skip).limit(limit).all()
  9. def create_user(db: Session, user: schemas.UserCreate):
  10. fake_hashed_password = user.password + "notreallyhashed"
  11. db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
  12. db.add(db_user)
  13. db.commit()
  14. db.refresh(db_user)
  15. return db_user
  16. def get_items(db: Session, skip: int = 0, limit: int = 100):
  17. return db.query(models.Item).offset(skip).limit(limit).all()
  18. def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
  19. db_item = models.Item(**item.dict(), owner_id=user_id)
  20. db.add(db_item)
  21. db.commit()
  22. db.refresh(db_item)
  23. return db_item

主程序:

  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from . import crud, models, schemas
  5. from .database import SessionLocal, engine
  6. models.Base.metadata.create_all(bind=engine)
  7. app = FastAPI()
  8. # Dependency
  9. def get_db():
  10. db = SessionLocal()
  11. try:
  12. yield db
  13. finally:
  14. db.close()
  15. @app.post("/users/", response_model=schemas.User)
  16. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  17. db_user = crud.get_user_by_email(db, email=user.email)
  18. if db_user:
  19. raise HTTPException(status_code=400, detail="Email already registered")
  20. return crud.create_user(db=db, user=user)
  21. @app.get("/users/", response_model=List[schemas.User])
  22. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  23. users = crud.get_users(db, skip=skip, limit=limit)
  24. return users
  25. @app.get("/users/{user_id}", response_model=schemas.User)
  26. def read_user(user_id: int, db: Session = Depends(get_db)):
  27. db_user = crud.get_user(db, user_id=user_id)
  28. if db_user is None:
  29. raise HTTPException(status_code=404, detail="User not found")
  30. return db_user
  31. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  32. def create_item_for_user(
  33. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  34. ):
  35. return crud.create_user_item(db=db, item=item, user_id=user_id)
  36. @app.get("/items/", response_model=List[schemas.Item])
  37. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  38. items = crud.get_items(db, skip=skip, limit=limit)
  39. return items

大应用-多文件

示例目录结构:

  1. .
  2. ├── app # "app" is a Python package
  3. │   ├── __init__.py # this file makes "app" a "Python package"
  4. │   ├── main.py # "main" module, e.g. import app.main
  5. │   ├── dependencies.py # "dependencies" module, e.g. import app.dependencies
  6. │   └── routers # "routers" is a "Python subpackage"
  7. │   │ ├── __init__.py # makes "routers" a "Python subpackage"
  8. │   │ ├── items.py # "items" submodule, e.g. import app.routers.items
  9. │   │ └── users.py # "users" submodule, e.g. import app.routers.users
  10. │   └── internal # "internal" is a "Python subpackage"
  11. │   ├── __init__.py # makes "internal" a "Python subpackage"
  12. │   └── admin.py # "admin" submodule, e.g. import app.internal.admin

APIRouter用于定义子模块的路由:

  1. from fastapi import APIRouter
  2. router = APIRouter()
  3. @router.get("/users/", tags=["users"])
  4. async def read_users():
  5. return [{"username": "Rick"}, {"username": "Morty"}]
  6. @router.get("/users/me", tags=["users"])
  7. async def read_user_me():
  8. return {"username": "fakecurrentuser"}
  9. @router.get("/users/{username}", tags=["users"])
  10. async def read_user(username: str):
  11. return {"username": username}
  1. from fastapi import APIRouter, Depends, HTTPException
  2. from ..dependencies import get_token_header
  3. router = APIRouter(
  4. prefix="/items",
  5. tags=["items"],
  6. dependencies=[Depends(get_token_header)],
  7. responses={404: {"description": "Not found"}},
  8. )
  9. fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}
  10. @router.get("/")
  11. async def read_items():
  12. return fake_items_db
  13. @router.get("/{item_id}")
  14. async def read_item(item_id: str):
  15. if item_id not in fake_items_db:
  16. raise HTTPException(status_code=404, detail="Item not found")
  17. return {"name": fake_items_db[item_id]["name"], "item_id": item_id}
  18. @router.put(
  19. "/{item_id}",
  20. tags=["custom"],
  21. responses={403: {"description": "Operation forbidden"}},
  22. )
  23. async def update_item(item_id: str):
  24. if item_id != "plumbus":
  25. raise HTTPException(
  26. status_code=403, detail="You can only update the item: plumbus"
  27. )
  28. return {"item_id": item_id, "name": "The great Plumbus"}

在主程序中引入子模块路由:

  1. from fastapi import Depends, FastAPI
  2. from .dependencies import get_query_token, get_token_header
  3. from .internal import admin
  4. from .routers import items, users
  5. app = FastAPI(dependencies=[Depends(get_query_token)])
  6. app.include_router(users.router)
  7. app.include_router(items.router)
  8. app.include_router(
  9. admin.router,
  10. prefix="/admin",
  11. tags=["admin"],
  12. dependencies=[Depends(get_token_header)],
  13. responses={418: {"description": "I'm a teapot"}},
  14. )
  15. @app.get("/")
  16. async def root():
  17. return {"message": "Hello Bigger Applications!"}

后台任务

使用BackgroundTasks定义后台任务:

  1. from fastapi import BackgroundTasks, FastAPI
  2. app = FastAPI()
  3. def write_notification(email: str, message=""):
  4. with open("log.txt", mode="w") as email_file:
  5. content = f"notification for {email}: {message}"
  6. email_file.write(content)
  7. @app.post("/send-notification/{email}")
  8. async def send_notification(email: str, background_tasks: BackgroundTasks):
  9. background_tasks.add_task(write_notification, email, message="some notification")
  10. return {"message": "Notification sent in the background"}

BackgroundTasks也能支持依赖注入:

  1. from typing import Union
  2. from fastapi import BackgroundTasks, Depends, FastAPI
  3. app = FastAPI()
  4. def write_log(message: str):
  5. with open("log.txt", mode="a") as log:
  6. log.write(message)
  7. def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
  8. if q:
  9. message = f"found query: {q}\n"
  10. background_tasks.add_task(write_log, message)
  11. return q
  12. @app.post("/send-notification/{email}")
  13. async def send_notification(
  14. email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
  15. ):
  16. message = f"message to {email}\n"
  17. background_tasks.add_task(write_log, message)
  18. return {"message": "Message sent"}

元数据和文档URL

设置应用元数据:

  1. from fastapi import FastAPI
  2. description = """
  3. ChimichangApp API helps you do awesome stuff.
  4. ## Items
  5. You can **read items**.
  6. ## Users
  7. You will be able to:
  8. * **Create users** (_not implemented_).
  9. * **Read users** (_not implemented_).
  10. """
  11. app = FastAPI(
  12. title="ChimichangApp",
  13. description=description,
  14. version="0.0.1",
  15. terms_of_service="http://example.com/terms/",
  16. contact={
  17. "name": "Deadpoolio the Amazing",
  18. "url": "http://x-force.example.com/contact/",
  19. "email": "dp@x-force.example.com",
  20. },
  21. license_info={
  22. "name": "Apache 2.0",
  23. "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
  24. },
  25. )
  26. @app.get("/items/")
  27. async def read_items():
  28. return [{"name": "Katana"}]

效果:

img

设置tag元数据:

  1. from fastapi import FastAPI
  2. tags_metadata = [
  3. {
  4. "name": "users",
  5. "description": "Operations with users. The **login** logic is also here.",
  6. },
  7. {
  8. "name": "items",
  9. "description": "Manage items. So _fancy_ they have their own docs.",
  10. "externalDocs": {
  11. "description": "Items external docs",
  12. "url": "https://fastapi.tiangolo.com/",
  13. },
  14. },
  15. ]
  16. app = FastAPI(openapi_tags=tags_metadata)
  17. @app.get("/users/", tags=["users"])
  18. async def get_users():
  19. return [{"name": "Harry"}, {"name": "Ron"}]
  20. @app.get("/items/", tags=["items"])
  21. async def get_items():
  22. return [{"name": "wand"}, {"name": "flying broom"}]

添加tag:

  1. from fastapi import FastAPI
  2. tags_metadata = [
  3. {
  4. "name": "users",
  5. "description": "Operations with users. The **login** logic is also here.",
  6. },
  7. {
  8. "name": "items",
  9. "description": "Manage items. So _fancy_ they have their own docs.",
  10. "externalDocs": {
  11. "description": "Items external docs",
  12. "url": "https://fastapi.tiangolo.com/",
  13. },
  14. },
  15. ]
  16. app = FastAPI(openapi_tags=tags_metadata)
  17. @app.get("/users/", tags=["users"])
  18. async def get_users():
  19. return [{"name": "Harry"}, {"name": "Ron"}]
  20. @app.get("/items/", tags=["items"])
  21. async def get_items():
  22. return [{"name": "wand"}, {"name": "flying broom"}]

效果:

img

OpenAPI的URL默认是/openapi.json,设置/api/v1/openapi.json

  1. from fastapi import FastAPI
  2. app = FastAPI(openapi_url="/api/v1/openapi.json")
  3. @app.get("/items/")
  4. async def read_items():
  5. return [{"name": "Foo"}]

文档的URL默认是/docs,设置为/documentation

  1. from fastapi import FastAPI
  2. app = FastAPI(docs_url="/documentation", redoc_url=None)
  3. @app.get("/items/")
  4. async def read_items():
  5. return [{"name": "Foo"}]

静态文件

使用StaticFiles

  1. from fastapi import FastAPI
  2. from fastapi.staticfiles import StaticFiles
  3. app = FastAPI()
  4. app.mount("/static", StaticFiles(directory="static"), name="static")

FastAPI会自动挂载静态文件。

单元测试

使用pytest和TestClient:

  1. from fastapi import FastAPI
  2. from fastapi.testclient import TestClient
  3. app = FastAPI()
  4. @app.get("/")
  5. async def read_main():
  6. return {"msg": "Hello World"}
  7. client = TestClient(app)
  8. def test_read_main():
  9. response = client.get("/")
  10. assert response.status_code == 200
  11. assert response.json() == {"msg": "Hello World"}

单元测试文件拆出来:

  1. .
  2. ├── app
  3. │   ├── __init__.py
  4. │   ├── main.py
  5. │   └── test_main.py
  1. from fastapi import FastAPI
  2. app = FastAPI()
  3. @app.get("/")
  4. async def read_main():
  5. return {"msg": "Hello World"}
  1. from fastapi.testclient import TestClient
  2. from .main import app
  3. client = TestClient(app)
  4. def test_read_main():
  5. response = client.get("/")
  6. assert response.status_code == 200
  7. assert response.json() == {"msg": "Hello World"}

调试

通常借助PyCharm打断点调试:

img

参考资料:

官方教程 https://fastapi.tiangolo.com/tutorial/

示例项目 https://fastapi.tiangolo.com/project-generation/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/416495?site
推荐阅读
相关标签
  

闽ICP备14008679号