赞
踩
pip install PyJWT cryptography
# -*- coding:utf-8 _*_ """ @Project : Qingyuing-Server @File : jwtUtil.py @Author : 晴天 @CreateTime : 2023-12-27 10:28 """ import os import jwt import base64 from uuid import uuid4 from datetime import datetime, timedelta, timezone from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization def generate_or_load_rsa_keys(private_key_path='private_key.pem', public_key_path='public_key.pem') -> None: """ 生成私钥/公钥用于生成jwt :param private_key_path: 私钥 :param public_key_path: 公钥 """ # 检查私钥和公钥文件是否已存在 if not (os.path.exists(private_key_path) and os.path.exists(public_key_path)): # 生成新的密钥对 private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # 获取私钥的PEM格式 private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) # 获取公钥的PEM格式 public_key = private_key.public_key() public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) # 将私钥和公钥写入文件 with open(private_key_path, 'wb') as f: f.write(private_pem) with open(public_key_path, 'wb') as f: f.write(public_pem) print('私钥公钥已存在跳过生成') def read_private_key(private_key_path='private_key.pem', public_key_path='public_key.pem'): """ 读取现有的私钥和公钥 :param private_key_path: 私钥 :param public_key_path: 公钥 :return: 私钥和公钥 """ try: with open(private_key_path, 'rb') as f: private_pem = f.read() with open(public_key_path, 'rb') as f: public_pem = f.read() return private_pem, public_pem except FileNotFoundError: print('私钥或公钥读取异常') def create_tokens(user_id, client_id): """ 生成JWT令牌 :param user_id: 用户ID :param client_id: 客户端 :return: 返回生成的access_token跟refresh_token """ private_key = read_private_key()[0] current_time = datetime.now(timezone.utc) # 使用带时区的UTC时间 jti_access = base64.urlsafe_b64encode(uuid4().bytes).rstrip(b'=').decode('utf-8') # 生成access_token的唯一标识符 jti_refresh = base64.urlsafe_b64encode(uuid4().bytes).rstrip(b'=').decode('utf-8') # 生成refresh_token的唯一标识符 # 创建access_token的payload access_payload = { 'exp': current_time + timedelta(hours=24), # 设置24小时后过期 'iat': current_time, 'user_id': user_id, 'jti': jti_access, 'client_id': client_id, 'scope': ['openid'] } # 创建refresh_token的payload refresh_payload = { 'exp': current_time + timedelta(days=7), # 设置7天后过期 'iat': current_time, 'user_id': user_id, 'jti': jti_refresh, 'client_id': client_id, 'scope': ['openid'] } # 使用RS256算法和私钥签署JWT access_token = jwt.encode(access_payload, private_key, algorithm='RS256') refresh_token = jwt.encode(refresh_payload, private_key, algorithm='RS256') return access_token, refresh_token def parse_jwt(token): public_key = read_private_key()[1] try: # 使用公钥验证JWT decoded = jwt.decode(token, public_key, algorithms=['RS256']) return decoded # 令牌有效,返回解码后的payload except jwt.ExpiredSignatureError: # 令牌过期 return "Token expired" except jwt.InvalidTokenError: # 令牌无效 return "Invalid token" except jwt.InvalidSignatureError: # 签名无效 return "Invalid signature" except Exception as e: # 其他异常 return str(f'其他异常: {e}') if __name__ == '__main__': accessToken, refreshToken = create_tokens('52111890', 'web_app') print(f"AccessToken: {accessToken}") print(f"RefreshToken: {refreshToken}") decoded_access_token = parse_jwt(accessToken) print('解析结果: ', decoded_access_token) decoded_refresh_token = parse_jwt(refreshToken) print('解析结果: ', decoded_refresh_token)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。