赞
踩
from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from pydantic import BaseModel from starlette import status from passlib.context import CryptContext app16 = APIRouter() """OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证""" fake_users_db = {} fake_users_db.update({ "john snow": { "username": "john snow", "full_name": "John Snow", "email": "johnsnow@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } }) SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # 生成密钥 openssl rand -hex 32 ALGORITHM = "HS256" # 算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 访问令牌过期分钟 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str class Token(BaseModel): """返回给用户的Token""" access_token: str token_type: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter16/jwt/token") def verity_password(plain_password: str, hashed_password: str): """对密码进行校验""" return pwd_context.verify(plain_password, hashed_password) def jwt_get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def jwt_authenticate_user(db, username: str, password: str): user = jwt_get_user(db=db, username=username) if not user: return False if not verity_password(plain_password=password, hashed_password=user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app16.post("/jwt/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} async def jwt_get_current_user(token: str = Depends(oauth2_schema)): credentials_exception = HTTPException( status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = jwt_get_user(db=fake_users_db, username=username) if user is None: raise credentials_exception return user async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)): if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user @app16.get("/jwt/users/me") async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)): return current_user
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。