赞
踩
FastAPI官方推荐使用python-jose
OAuth2 实现密码哈希与 Bearer + JWT Token 验证
先安装
pipenv install python-jose
还要安装passlib
passlib
是处理密码加密和验证的Python包,此处推荐的算法是Bcrypt
pipenv install bcrypt
pipenv install passlib
定义用户相关的模型,数据集
from pydantic import BaseModel # 用户模型 class User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = None # 包装用户的hashed_password字段 class UserHashedPassword(User): hashed_password: str # 用于模拟查询数据库返回的用户集合 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } # 从数据库中获取用户信息 def get_user(db, username: str): if username in db: user_dict = db[username] return UserHashedPassword(**user_dict)
定义Token相关模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
定义密码加密和验证等相关函数
from passlib.context import CryptContext # CryptContext对象将提供密码加密和验证功能 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 使用CryptContext对象对用户的password进行哈希加密 def get_password_hash(password): return pwd_context.hash(password) # 使用CryptContext对象对用户的password进行验证 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 验证用户的密码的正确性 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user
定义获取用户资料等相关函数,其中tokenUrl="token"
指明/token
路径获取token
from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)],): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user
定义创建token
的函数
from jose import jwt, JWTError from datetime import datetime, timedelta, timezone SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 def create_access_token(data: dict, expires_delta: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)): plain_text = data.copy() expire = datetime.now(timezone.utc) + expires_delta plain_text.update({"exp": expire}) encoded_jwt = jwt.encode(plain_text, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
根据username
和password
获取token
from fastapi import FastAPI app = FastAPI() @app.post("/token",summary = "使用密码模式获取token") async def login_by_username_password( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer")
JWT 规范要求sub
键放置username
的值
打开http://127.0.0.1:8000/docs 可以看到获取token
需要提供username
和password
把username:johndoe
和password:secret
提交到http://127.0.0.1:8000/token 来获取token
通过token获取当前用户的信息
@app.get("/users/me", response_model = User, summary = "获取当前用户的信息")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
点击Close按钮,接着点击Try it out
打开浏览器的开发者工具,可以看到数据是怎么发送过去的
完整代码
# -*- coding:utf-8 –*- from pydantic import BaseModel # 用户模型 class User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = None # 包装用户的hashed_password字段 class UserHashedPassword(User): hashed_password: str # 用于模拟查询数据库返回的用户集合 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } # 从数据库中获取用户信息 def get_user(db, username: str): if username in db: user_dict = db[username] return UserHashedPassword(**user_dict) from passlib.context import CryptContext # CryptContext对象将提供密码加密和验证功能 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 使用CryptContext对象对用户的password进行哈希加密 def get_password_hash(password): return pwd_context.hash(password) # 使用CryptContext对象对用户的password进行验证 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 验证用户的密码的正确性 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)], ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str | None = None from jose import jwt, JWTError from datetime import datetime, timedelta, timezone SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 def create_access_token(data: dict, expires_delta: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)): plain_text = data.copy() expire = datetime.now(timezone.utc) + expires_delta plain_text.update({"exp": expire}) encoded_jwt = jwt.encode(plain_text, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt from fastapi import FastAPI app = FastAPI() @app.post("/token", summary="使用密码模式获取token") async def login_by_username_password( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @app.post("/token", summary = "使用密码模式获取token") async def login_by_username_password( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @app.get("/users/me", response_model = User, summary = "获取当前用户的信息") async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)], ): return current_user
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。