当前位置:   article > 正文

FastAPI 结合 JWT_fastapi jwt

fastapi jwt

FastAPI 结合 JWT

JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。

步骤

在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt

  • 1
  • 2

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import jwt

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥
ALGORITHM = "HS256"             # 加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建数据模型

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

获取用户数据

fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",
        "disabled": False,
    }
}

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

验证密码

def verify_password(plain_password, hashed_password):
    return plain_password == hashed_password

  • 1
  • 2
  • 3

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

用户登录获取 Token

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

受保护的路由示例

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

  • 1
  • 2
  • 3
  • 4

所有代码

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import uvicorn
import jwt
import os

# JWT 相关配置
SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
ALGORITHM = "HS256"  # 使用的加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位

app = FastAPI()  # 创建 FastAPI 应用实例

# OAuth2PasswordBearer 实例,用于依赖项
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码
        "disabled": False,  # 用户是否被禁用
    }
}

# Pydantic 模型,用于定义请求和响应的数据结构
class Token(BaseModel):
    access_token: str  # Token 字符串
    token_type: str  # Token 类型(一般为 "bearer")

class TokenData(BaseModel):
    username: str | None = None  # 从 Token 中提取的用户名

class User(BaseModel):
    username: str  # 用户名
    email: str | None = None  # 邮箱地址,可选
    full_name: str | None = None  # 用户全名,可选
    disabled: bool | None = None  # 用户是否被禁用,可选

class UserInDB(User):
    hashed_password: str  # 存储在数据库中的哈希密码

# 生成 JWT Token 的函数
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    生成 JWT Token。

    参数:
    - data (dict): 要编码到 JWT 中的数据。
    - expires_delta (timedelta, 可选): Token 的过期时间。

    返回:
    - str: 编码后的 JWT 字符串。
    """
    to_encode = data.copy()  # 创建副本,以避免修改原始数据
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT
    return encoded_jwt

# 从假数据库获取用户信息
def get_user(db, username: str):
    """
    从数据库中获取用户信息。

    参数:
    - db (dict): 用户数据库(在本例中为假数据)。
    - username (str): 用户名。

    返回:
    - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 验证用户密码
def verify_password(plain_password, hashed_password):
    """
    验证用户密码。

    参数:
    - plain_password (str): 用户输入的明文密码。
    - hashed_password (str): 存储在数据库中的哈希密码。

    返回:
    - bool: 密码匹配返回 True,否则返回 False。
    """
    return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较

# 验证 Token 并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    从 JWT Token 中提取用户信息,并验证 Token 的合法性。

    参数:
    - token (str): JWT Token。

    返回:
    - User: 返回当前用户信息。

    抛出:
    - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT
        username: str = payload.get("sub")  # 获取 JWT 中的用户名
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息
    if user is None:
        raise credentials_exception
    return user

# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """
    验证当前用户是否被禁用。

    参数:
    - current_user (User): 当前用户信息。

    返回:
    - User: 如果用户未被禁用,返回用户信息。

    抛出:
    - HTTPException: 当用户被禁用时抛出 400 错误。
    """
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# 用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    用户登录接口,用于获取 JWT Token。

    参数:
    - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。

    返回:
    - dict: 包含 access_token 和 token_type 的响应数据。

    抛出:
    - HTTPException: 当用户名或密码错误时抛出 401 错误。
    """
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """
    获取当前用户信息的受保护路由。

    参数:
    - current_user (User): 当前登录的用户信息(通过 JWT 验证)。

    返回:
    - User: 返回当前用户的信息。
    """
    return current_user

# 运行应用
if __name__ == "__main__":
    uvicorn.run(
        f"{os.path.basename(__file__).split('.')[0]}:app",
        host="127.0.0.1",
        port=8000,
        reload=True,  # 启用自动重载
    )

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198

测试

获取 Token

在这里插入图片描述

访问受保护的路由

token正确

在这里插入图片描述

token错误

在这里插入图片描述

总结

JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。

在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。

实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。

使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。

注意

示例中密码并没有经过hash加密实际应用中要加密的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/990618
推荐阅读
相关标签
  

闽ICP备14008679号