赞
踩
JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。
在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:
pip install fastapi uvicorn pyjwt
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import jwt
SECRET_KEY = "your_secret_key" # 用于签名 JWT 的密钥
ALGORITHM = "HS256" # 加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 过期时间
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": "fakehashedpassword",
"disabled": False,
}
}
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def verify_password(plain_password, hashed_password):
return plain_password == hashed_password
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(fake_users_db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import uvicorn
import jwt
import os
# JWT 相关配置
SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3" # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
ALGORITHM = "HS256" # 使用的加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 的有效时间,以分钟为单位
app = FastAPI() # 创建 FastAPI 应用实例
# OAuth2PasswordBearer 实例,用于依赖项
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": "fakehashedpassword", # 在实际应用中,存储经过哈希处理的密码
"disabled": False, # 用户是否被禁用
}
}
# Pydantic 模型,用于定义请求和响应的数据结构
class Token(BaseModel):
access_token: str # Token 字符串
token_type: str # Token 类型(一般为 "bearer")
class TokenData(BaseModel):
username: str | None = None # 从 Token 中提取的用户名
class User(BaseModel):
username: str # 用户名
email: str | None = None # 邮箱地址,可选
full_name: str | None = None # 用户全名,可选
disabled: bool | None = None # 用户是否被禁用,可选
class UserInDB(User):
hashed_password: str # 存储在数据库中的哈希密码
# 生成 JWT Token 的函数
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""
生成 JWT Token。
参数:
- data (dict): 要编码到 JWT 中的数据。
- expires_delta (timedelta, 可选): Token 的过期时间。
返回:
- str: 编码后的 JWT 字符串。
"""
to_encode = data.copy() # 创建副本,以避免修改原始数据
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire}) # 添加过期时间到 JWT 数据中
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 生成 JWT
return encoded_jwt
# 从假数据库获取用户信息
def get_user(db, username: str):
"""
从数据库中获取用户信息。
参数:
- db (dict): 用户数据库(在本例中为假数据)。
- username (str): 用户名。
返回:
- UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
"""
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 验证用户密码
def verify_password(plain_password, hashed_password):
"""
验证用户密码。
参数:
- plain_password (str): 用户输入的明文密码。
- hashed_password (str): 存储在数据库中的哈希密码。
返回:
- bool: 密码匹配返回 True,否则返回 False。
"""
return plain_password == hashed_password # 在实际应用中,这里应该使用哈希函数进行比较
# 验证 Token 并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
从 JWT Token 中提取用户信息,并验证 Token 的合法性。
参数:
- token (str): JWT Token。
返回:
- User: 返回当前用户信息。
抛出:
- HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 解码 JWT
username: str = payload.get("sub") # 获取 JWT 中的用户名
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username) # 获取用户信息
if user is None:
raise credentials_exception
return user
# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
"""
验证当前用户是否被禁用。
参数:
- current_user (User): 当前用户信息。
返回:
- User: 如果用户未被禁用,返回用户信息。
抛出:
- HTTPException: 当用户被禁用时抛出 400 错误。
"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
用户登录接口,用于获取 JWT Token。
参数:
- form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。
返回:
- dict: 包含 access_token 和 token_type 的响应数据。
抛出:
- HTTPException: 当用户名或密码错误时抛出 401 错误。
"""
user = get_user(fake_users_db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 设置 Token 过期时间
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""
获取当前用户信息的受保护路由。
参数:
- current_user (User): 当前登录的用户信息(通过 JWT 验证)。
返回:
- User: 返回当前用户的信息。
"""
return current_user
# 运行应用
if __name__ == "__main__":
uvicorn.run(
f"{os.path.basename(__file__).split('.')[0]}:app",
host="127.0.0.1",
port=8000,
reload=True, # 启用自动重载
)
JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。
在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。
实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。
- JWT 配置:包括密钥(
SECRET_KEY
)和加密算法(ALGORITHM
),用于生成和验证令牌。- 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
- API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。
使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。
示例中
密码并没有经过hash加密
,实际应用中要加密的
。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。